From 108691f3ce31f247ed6499ecc5f14708fd94a6eb Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sat, 20 Dec 2025 23:14:24 -0500 Subject: [PATCH 1/4] changed durables API to use rust durations --- Cargo.lock | 213 +++++++++++++++++++++++++++++++++++++- Cargo.toml | 1 + benches/checkpoint.rs | 16 +-- benches/common/setup.rs | 4 +- benches/concurrency.rs | 16 ++- benches/throughput.rs | 10 +- src/client.rs | 6 +- src/context.rs | 20 ++-- src/types.rs | 78 ++++++++------ src/worker.rs | 18 ++-- tests/checkpoint_test.rs | 44 ++++---- tests/concurrency_test.rs | 8 +- tests/crash_test.rs | 68 ++++++------ tests/event_test.rs | 44 ++++---- tests/execution_test.rs | 60 +++++------ tests/fanout_test.rs | 36 +++---- tests/lease_test.rs | 20 ++-- tests/partition_test.rs | 22 ++-- tests/retry_test.rs | 28 ++--- tests/spawn_test.rs | 17 +-- tests/telemetry_test.rs | 20 ++-- 21 files changed, 507 insertions(+), 242 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e26e152..64379bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "der" version = "0.7.10" @@ -393,6 +428,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +dependencies = [ + "powerfmt", + "serde_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -441,6 +486,7 @@ dependencies = [ "rand 0.9.2", "serde", "serde_json", + "serde_with", "sqlx", "thiserror 2.0.17", "tokio", @@ -451,6 +497,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -511,6 +563,12 @@ dependencies = [ "spin", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -682,6 +740,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.15.5" @@ -872,6 +936,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -893,6 +963,17 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", + "serde", +] + [[package]] name = "indexmap" version = "2.12.1" @@ -901,6 +982,8 @@ checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -1038,7 +1121,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.12.1", "metrics", "ordered-float", "quanta", @@ -1091,6 +1174,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1301,6 +1390,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1456,6 +1551,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "regex" version = "1.12.2" @@ -1574,6 +1689,30 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schemars" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + +[[package]] +name = "schemars" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1644,6 +1783,37 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fa237f2807440d238e0364a218270b98f767a00d3dada77b1c53ae88940e2e7" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.12.1", + "schemars 0.9.0", + "schemars 1.1.0", + "serde_core", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a8e3ca0ca629121f70ab50f95249e5a6f925cc0f6ffe8256c45b728875706c" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "sha1" version = "0.10.6" @@ -1783,7 +1953,7 @@ dependencies = [ "futures-util", "hashbrown 0.16.1", "hashlink", - "indexmap", + "indexmap 2.12.1", "log", "memchr", "percent-encoding", @@ -1965,6 +2135,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -2042,6 +2218,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -2143,7 +2350,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.12.1", "serde", "serde_spanned", "toml_datetime", diff --git a/Cargo.toml b/Cargo.toml index 4895fe8..18d7585 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ tokio = { version = "1", features = ["full"] } sqlx = { version = "0.9.0-alpha.1", features = ["sqlx-toml", "postgres", "runtime-tokio", "chrono", "tls-rustls", "uuid", "migrate"] } serde = { version = "1", features = ["derive"] } serde_json = "1" +serde_with = "3" anyhow = "1" thiserror = "2" async-trait = "0.1" diff --git a/benches/checkpoint.rs b/benches/checkpoint.rs index 84852e9..753900a 100644 --- a/benches/checkpoint.rs +++ b/benches/checkpoint.rs @@ -39,8 +39,10 @@ fn bench_step_cache_miss(c: &mut Criterion) { .unwrap(); let start = std::time::Instant::now(); - let worker = - ctx.client.start_worker(bench_worker_options(1, 120)).await; + let worker = ctx + .client + .start_worker(bench_worker_options(1, Duration::from_secs(120))) + .await; wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; total_time += start.elapsed(); @@ -91,7 +93,7 @@ fn bench_step_cache_hit(c: &mut Criterion) { let worker = ctx .client - .start_worker(bench_worker_options(1, 120)) + .start_worker(bench_worker_options(1, Duration::from_secs(120))) .await; wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; @@ -123,7 +125,7 @@ fn bench_step_cache_hit(c: &mut Criterion) { let start = std::time::Instant::now(); let worker = ctx .client - .start_worker(bench_worker_options(1, 120)) + .start_worker(bench_worker_options(1, Duration::from_secs(120))) .await; wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; @@ -177,8 +179,10 @@ fn bench_large_payload_checkpoint(c: &mut Criterion) { .unwrap(); let start = std::time::Instant::now(); - let worker = - ctx.client.start_worker(bench_worker_options(1, 120)).await; + let worker = ctx + .client + .start_worker(bench_worker_options(1, Duration::from_secs(120))) + .await; wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 120).await; total_time += start.elapsed(); diff --git a/benches/common/setup.rs b/benches/common/setup.rs index f1ebbaf..7130e87 100644 --- a/benches/common/setup.rs +++ b/benches/common/setup.rs @@ -120,11 +120,11 @@ pub async fn clear_completed_tasks(pool: &PgPool, queue: &str) { } /// Default worker options optimized for benchmarking -pub fn bench_worker_options(concurrency: usize, claim_timeout: u64) -> WorkerOptions { +pub fn bench_worker_options(concurrency: usize, claim_timeout: Duration) -> WorkerOptions { WorkerOptions { worker_id: None, concurrency, - poll_interval: 0.001, // Very fast polling for accurate timing + poll_interval: Duration::from_millis(1), // Very fast polling for accurate timing claim_timeout, batch_size: None, // Use default (= concurrency) fatal_on_lease_timeout: false, diff --git a/benches/concurrency.rs b/benches/concurrency.rs index 263af3b..1ff2cd3 100644 --- a/benches/concurrency.rs +++ b/benches/concurrency.rs @@ -57,8 +57,12 @@ fn bench_concurrent_claims(c: &mut Criterion) { // Sync all workers to start together barrier.wait().await; - let worker = - client.start_worker(bench_worker_options(1, 60)).await; + let worker = client + .start_worker(bench_worker_options( + 1, + Duration::from_secs(60), + )) + .await; // Wait a bit then shutdown tokio::time::sleep(Duration::from_secs(15)).await; @@ -142,8 +146,12 @@ fn bench_claim_latency_distribution(c: &mut Criterion) { let handle = tokio::spawn(async move { barrier.wait().await; - let worker = - client.start_worker(bench_worker_options(4, 60)).await; + let worker = client + .start_worker(bench_worker_options( + 4, + Duration::from_secs(60), + )) + .await; tokio::time::sleep(Duration::from_secs(20)).await; worker.shutdown().await; diff --git a/benches/throughput.rs b/benches/throughput.rs index 25fbeca..af1d011 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -72,7 +72,10 @@ fn bench_task_throughput(c: &mut Criterion) { let start = std::time::Instant::now(); let worker = ctx .client - .start_worker(bench_worker_options(concurrency, 60)) + .start_worker(bench_worker_options( + concurrency, + Duration::from_secs(60), + )) .await; wait_for_tasks_complete( @@ -112,7 +115,10 @@ fn bench_e2e_completion(c: &mut Criterion) { let ctx = BenchContext::new().await; ctx.client.register::().await.unwrap(); - let worker = ctx.client.start_worker(bench_worker_options(1, 60)).await; + let worker = ctx + .client + .start_worker(bench_worker_options(1, Duration::from_secs(60))) + .await; let start = std::time::Instant::now(); for i in 0..iters { diff --git a/src/client.rs b/src/client.rs index b685087..70432a4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -36,12 +36,12 @@ struct CancellationPolicyDb { impl CancellationPolicyDb { fn from_policy(policy: &CancellationPolicy) -> Option { - if policy.max_delay.is_none() && policy.max_duration.is_none() { + if policy.max_pending_time.is_none() && policy.max_running_time.is_none() { None } else { Some(Self { - max_delay: policy.max_delay, - max_duration: policy.max_duration, + max_delay: policy.max_pending_time.map(|d| d.as_secs()), + max_duration: policy.max_running_time.map(|d| d.as_secs()), }) } } diff --git a/src/context.rs b/src/context.rs index 80edad6..b49eefa 100644 --- a/src/context.rs +++ b/src/context.rs @@ -56,7 +56,7 @@ where queue_name: String, #[allow(dead_code)] task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, state: State, @@ -94,7 +94,7 @@ where pool: PgPool, queue_name: String, task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, lease_extender: LeaseExtender, registry: Arc>>, state: State, @@ -274,15 +274,14 @@ where .bind(name) .bind(&state_json) .bind(self.run_id) - .bind(self.claim_timeout as i32) + .bind(self.claim_timeout.as_secs() as i32) .execute(&self.pool) .await?; self.checkpoint_cache.insert(name.to_string(), state_json); // Notify worker that lease was extended so it can reset timers - self.lease_extender - .notify(Duration::from_secs(self.claim_timeout)); + self.lease_extender.notify(self.claim_timeout); Ok(()) } @@ -455,21 +454,18 @@ where ) )] pub async fn heartbeat(&self, duration: Option) -> TaskResult<()> { - let extend_by = duration - .map(|d| d.as_secs() as i32) - .unwrap_or(self.claim_timeout as i32); + let extend_by = duration.unwrap_or(self.claim_timeout); let query = "SELECT durable.extend_claim($1, $2, $3)"; sqlx::query(query) .bind(&self.queue_name) .bind(self.run_id) - .bind(extend_by) + .bind(extend_by.as_secs() as i32) .execute(&self.pool) .await?; // Notify worker that lease was extended so it can reset timers - self.lease_extender - .notify(Duration::from_secs(extend_by as u64)); + self.lease_extender.notify(extend_by); Ok(()) } @@ -834,7 +830,7 @@ mod tests { headers: None, wake_event: None, }, - 10, + Duration::from_secs(10), LeaseExtender::dummy_for_tests(), Arc::new(RwLock::new(TaskRegistry::new())), (), diff --git a/src/types.rs b/src/types.rs index 8954cd4..5861750 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,19 +1,21 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use serde_with::{DurationSeconds, serde_as}; use std::collections::HashMap; use std::marker::PhantomData; +use std::time::Duration; use uuid::Uuid; // Default value functions for RetryStrategy -fn default_base_seconds() -> u64 { - 5 +fn default_base_delay() -> Duration { + Duration::from_secs(5) } fn default_factor() -> f64 { 2.0 } -fn default_max_seconds() -> u64 { - 300 +fn default_max_backoff() -> Duration { + Duration::from_secs(300) } /// Retry strategy for failed tasks. @@ -24,17 +26,19 @@ fn default_max_seconds() -> u64 { /// # Example /// /// ``` +/// use std::time::Duration; /// use durable::{RetryStrategy, SpawnOptions}; /// /// let options = SpawnOptions { /// retry_strategy: Some(RetryStrategy::Exponential { -/// base_seconds: 1, +/// base_delay: Duration::from_secs(1), /// factor: 2.0, -/// max_seconds: 60, +/// max_backoff: Duration::from_secs(60), /// }), /// ..Default::default() /// }; /// ``` +#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum RetryStrategy { @@ -43,29 +47,32 @@ pub enum RetryStrategy { /// Fixed delay between retries Fixed { - /// Delay in seconds between retry attempts (default: 5) - #[serde(default = "default_base_seconds")] - base_seconds: u64, + /// Delay between retry attempts (default: 5 seconds) + #[serde(default = "default_base_delay", rename = "base_seconds")] + #[serde_as(as = "DurationSeconds")] + base_delay: Duration, }, - /// Exponential backoff: delay = base_seconds * (factor ^ (attempt - 1)) + /// Exponential backoff: delay = base_delay * (factor ^ (attempt - 1)) Exponential { - /// Initial delay in seconds (default: 5) - #[serde(default = "default_base_seconds")] - base_seconds: u64, + /// Initial delay (default: 5 seconds) + #[serde(default = "default_base_delay", rename = "base_seconds")] + #[serde_as(as = "DurationSeconds")] + base_delay: Duration, /// Multiplier for each subsequent attempt (default: 2.0) #[serde(default = "default_factor")] factor: f64, - /// Maximum delay cap in seconds (default: 300) - #[serde(default = "default_max_seconds")] - max_seconds: u64, + /// Maximum delay cap (default: 300 seconds) + #[serde(default = "default_max_backoff", rename = "max_seconds")] + #[serde_as(as = "DurationSeconds")] + max_backoff: Duration, }, } impl Default for RetryStrategy { fn default() -> Self { Self::Fixed { - base_seconds: default_base_seconds(), + base_delay: default_base_delay(), } } } @@ -74,17 +81,20 @@ impl Default for RetryStrategy { /// /// Allows tasks to be automatically cancelled based on how long they've been /// waiting or running. Useful for preventing stale tasks from consuming resources. +#[serde_as] #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct CancellationPolicy { - /// Cancel if task has been pending for more than this many seconds. + /// Cancel if task has been pending for more than this duration. /// Checked when the task would be claimed. - #[serde(skip_serializing_if = "Option::is_none")] - pub max_delay: Option, + #[serde(skip_serializing_if = "Option::is_none", rename = "max_delay")] + #[serde_as(as = "Option>")] + pub max_pending_time: Option, - /// Cancel if task has been running for more than this many seconds total + /// Cancel if task has been running for more than this duration total /// (across all attempts). Checked on retry. - #[serde(skip_serializing_if = "Option::is_none")] - pub max_duration: Option, + #[serde(skip_serializing_if = "Option::is_none", rename = "max_duration")] + #[serde_as(as = "Option>")] + pub max_running_time: Option, } /// Options for spawning a task. @@ -94,14 +104,15 @@ pub struct CancellationPolicy { /// # Example /// /// ``` +/// use std::time::Duration; /// use durable::{SpawnOptions, RetryStrategy}; /// /// let options = SpawnOptions { /// max_attempts: Some(3), /// retry_strategy: Some(RetryStrategy::Exponential { -/// base_seconds: 5, +/// base_delay: Duration::from_secs(5), /// factor: 2.0, -/// max_seconds: 300, +/// max_backoff: Duration::from_secs(300), /// }), /// ..Default::default() /// }; @@ -130,12 +141,13 @@ pub struct SpawnOptions { /// # Example /// /// ``` +/// use std::time::Duration; /// use durable::WorkerOptions; /// /// let options = WorkerOptions { /// concurrency: 4, -/// claim_timeout: 120, -/// poll_interval: 0.5, +/// claim_timeout: Duration::from_secs(120), +/// poll_interval: Duration::from_millis(500), /// ..Default::default() /// }; /// ``` @@ -144,9 +156,9 @@ pub struct WorkerOptions { /// Unique worker identifier (default: hostname:pid) pub worker_id: Option, - /// Task lease duration in seconds (default: 120). + /// Task lease duration (default: 120 seconds). /// Tasks must complete or checkpoint within this time. - pub claim_timeout: u64, + pub claim_timeout: Duration, /// Maximum tasks to claim per poll (default: same as concurrency) pub batch_size: Option, @@ -154,8 +166,8 @@ pub struct WorkerOptions { /// Maximum parallel task executions (default: 1) pub concurrency: usize, - /// Seconds between polls when queue is empty (default: 0.25) - pub poll_interval: f64, + /// Interval between polls when queue is empty (default: 250ms) + pub poll_interval: Duration, /// Terminate process if task exceeds 2x claim_timeout (default: false). /// When false, the task is aborted but other tasks continue running. @@ -167,10 +179,10 @@ impl Default for WorkerOptions { fn default() -> Self { Self { worker_id: None, - claim_timeout: 120, + claim_timeout: Duration::from_secs(120), batch_size: None, concurrency: 1, - poll_interval: 0.25, + poll_interval: Duration::from_millis(250), fatal_on_lease_timeout: false, } } diff --git a/src/worker.rs b/src/worker.rs index ce6ead9..c93619b 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -123,7 +123,7 @@ impl Worker { let concurrency = options.concurrency; let batch_size = options.batch_size.unwrap_or(concurrency); let claim_timeout = options.claim_timeout; - let poll_interval = std::time::Duration::from_secs_f64(options.poll_interval); + let poll_interval = options.poll_interval; let fatal_on_lease_timeout = options.fatal_on_lease_timeout; // Mark worker as active @@ -223,7 +223,7 @@ impl Worker { pool: &PgPool, queue_name: &str, worker_id: &str, - claim_timeout: u64, + claim_timeout: Duration, count: usize, ) -> anyhow::Result> { #[cfg(feature = "telemetry")] @@ -236,7 +236,7 @@ impl Worker { let rows: Vec = sqlx::query_as(query) .bind(queue_name) .bind(worker_id) - .bind(claim_timeout as i32) + .bind(claim_timeout.as_secs() as i32) .bind(count as i32) .fetch_all(pool) .await?; @@ -263,7 +263,7 @@ impl Worker { queue_name: String, registry: Arc>>, task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, fatal_on_lease_timeout: bool, state: State, ) where @@ -305,7 +305,7 @@ impl Worker { queue_name: String, registry: Arc>>, task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, fatal_on_lease_timeout: bool, state: State, ) where @@ -377,7 +377,7 @@ impl Worker { let timer_handle = tokio::spawn({ let task_label = task_label.clone(); async move { - let mut warn_duration = Duration::from_secs(claim_timeout); + let mut warn_duration = claim_timeout; let mut fatal_duration = warn_duration * 2; let mut warn_fired = false; let mut deadline = Instant::now(); @@ -414,7 +414,7 @@ impl Worker { tracing::warn!( "Task {} exceeded claim timeout of {}s (no heartbeat/step since last extension)", task_label, - claim_timeout + claim_timeout.as_secs() ); warn_fired = true; } @@ -449,7 +449,7 @@ impl Worker { task_id = %task_id, run_id = %run_id, elapsed_secs = elapsed.as_secs(), - claim_timeout_secs = claim_timeout, + claim_timeout_secs = claim_timeout.as_secs(), "Task {} exceeded 2x claim timeout without heartbeat; terminating process", task_label ); @@ -459,7 +459,7 @@ impl Worker { task_id = %task_id, run_id = %run_id, elapsed_secs = elapsed.as_secs(), - claim_timeout_secs = claim_timeout, + claim_timeout_secs = claim_timeout.as_secs(), "Task {} exceeded 2x claim timeout without heartbeat; aborting task", task_label ); diff --git a/tests/checkpoint_test.rs b/tests/checkpoint_test.rs index 7ecc1bb..6977adc 100644 --- a/tests/checkpoint_test.rs +++ b/tests/checkpoint_test.rs @@ -42,7 +42,9 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result fail_after_step2: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -53,8 +55,8 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -94,8 +96,8 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result let worker2 = client2 .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -138,7 +140,9 @@ async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Resul fail_on_first_attempt: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -148,8 +152,8 @@ async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Resul let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -199,7 +203,9 @@ async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result fail_on_first_attempt: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -209,8 +215,8 @@ async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -257,7 +263,9 @@ async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Resu fail_on_first_attempt: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -267,8 +275,8 @@ async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Resu let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -316,8 +324,8 @@ async fn test_long_workflow_many_steps(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), ..Default::default() }) .await; @@ -368,8 +376,8 @@ async fn test_large_payload_checkpoint(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), ..Default::default() }) .await; diff --git a/tests/concurrency_test.rs b/tests/concurrency_test.rs index 22f9a92..6e6cc6a 100644 --- a/tests/concurrency_test.rs +++ b/tests/concurrency_test.rs @@ -57,8 +57,8 @@ async fn test_task_claimed_by_exactly_one_worker(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.01, // Fast polling - claim_timeout: 30, + poll_interval: Duration::from_millis(10), // Fast polling + claim_timeout: Duration::from_secs(30), concurrency: 1, ..Default::default() }) @@ -156,8 +156,8 @@ async fn test_concurrent_claims_with_skip_locked(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.01, // Fast polling to maximize contention - claim_timeout: 30, + poll_interval: Duration::from_millis(10), // Fast polling to maximize contention + claim_timeout: Duration::from_secs(30), concurrency: 5, // Each worker handles multiple tasks ..Default::default() }) diff --git a/tests/crash_test.rs b/tests/crash_test.rs index ab1fde3..db707c0 100644 --- a/tests/crash_test.rs +++ b/tests/crash_test.rs @@ -42,7 +42,9 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu fail_after_step2: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -54,8 +56,8 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -77,8 +79,8 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu // Second worker picks up the task let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 5, // Short timeout to reclaim quickly + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(5), // Short timeout to reclaim quickly ..Default::default() }) .await; @@ -113,7 +115,7 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a slow task that will outlive the lease let spawn_result = client @@ -127,7 +129,7 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -145,13 +147,13 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { } // Wait for real time to pass the lease timeout - tokio::time::sleep(Duration::from_secs(claim_timeout + 1)).await; + tokio::time::sleep(claim_timeout + Duration::from_secs(1)).await; // Second worker should reclaim the task let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, // Longer timeout for second worker + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), // Longer timeout for second worker ..Default::default() }) .await; @@ -184,7 +186,7 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a long-running task that heartbeats (but we'll let the lease expire) let spawn_result = client @@ -198,7 +200,7 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> // First worker claims the task let worker1 = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -214,13 +216,13 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> worker1.shutdown().await; // Advance time past the lease timeout - advance_time(&pool, claim_timeout as i64 + 1).await?; + advance_time(&pool, claim_timeout.as_secs() as i64 + 1).await?; // Second worker should be able to reclaim let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, // Longer timeout this time + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), // Longer timeout this time ..Default::default() }) .await; @@ -248,7 +250,7 @@ async fn test_heartbeat_prevents_lease_expiration(pool: PgPool) -> sqlx::Result< client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a task that runs for 5 seconds with frequent heartbeats let spawn_result = client @@ -261,7 +263,7 @@ async fn test_heartbeat_prevents_lease_expiration(pool: PgPool) -> sqlx::Result< let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -309,8 +311,8 @@ async fn test_spawn_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Handle parent and child ..Default::default() }) @@ -372,7 +374,9 @@ async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { fail_after_step2: false, // Don't fail, just complete }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -382,8 +386,8 @@ async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -428,7 +432,7 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a CPU-bound task that runs for 10 seconds (way longer than lease) let spawn_result = client @@ -437,7 +441,9 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { duration_ms: 10000, // 10 seconds }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -447,7 +453,7 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -457,7 +463,7 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { tokio::time::sleep(Duration::from_millis(200)).await; // Advance time past the lease timeout - advance_time(&pool, claim_timeout as i64 + 1).await?; + advance_time(&pool, claim_timeout.as_secs() as i64 + 1).await?; // Give time for reclaim to happen tokio::time::sleep(Duration::from_millis(1000)).await; @@ -485,7 +491,7 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a slow task that sleeps for 30 seconds without heartbeat let spawn_result = client @@ -494,7 +500,9 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { sleep_ms: 30000, // 30 seconds - much longer than lease }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(5), ..Default::default() }, @@ -504,7 +512,7 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -517,7 +525,7 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { assert_eq!(state, Some("running".to_string())); // Wait for real time to pass the lease timeout - tokio::time::sleep(Duration::from_secs(claim_timeout + 2)).await; + tokio::time::sleep(claim_timeout + Duration::from_secs(2)).await; // Verify a new run was created (reclaim happened) let run_count = count_runs_for_task(&pool, "crash_slow", spawn_result.task_id).await?; diff --git a/tests/event_test.rs b/tests/event_test.rs index f162800..9ab62b9 100644 --- a/tests/event_test.rs +++ b/tests/event_test.rs @@ -40,8 +40,8 @@ async fn test_emit_event_wakes_waiter(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -113,8 +113,8 @@ async fn test_event_already_emitted_returns_immediately(pool: PgPool) -> sqlx::R let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -170,8 +170,8 @@ async fn test_event_timeout_triggers(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -226,8 +226,8 @@ async fn test_multiple_waiters_same_event(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 3, ..Default::default() }) @@ -279,7 +279,9 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> event_name: "retry_event".to_string(), }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -289,8 +291,8 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -359,8 +361,8 @@ async fn test_event_last_write_wins(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -410,8 +412,8 @@ async fn test_multiple_distinct_events(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -480,8 +482,8 @@ async fn test_event_write_does_not_propagate_after_wake(pool: PgPool) -> sqlx::R let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -550,8 +552,8 @@ async fn test_emit_from_different_task(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) @@ -725,8 +727,8 @@ async fn test_event_timeout_error_payload(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; diff --git a/tests/execution_test.rs b/tests/execution_test.rs index e32c45a..e91f7f5 100644 --- a/tests/execution_test.rs +++ b/tests/execution_test.rs @@ -83,8 +83,8 @@ async fn test_simple_task_executes_and_completes(pool: PgPool) -> sqlx::Result<( // Start worker with short poll interval let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -129,8 +129,8 @@ async fn test_task_state_transitions(pool: PgPool) -> sqlx::Result<()> { // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -159,8 +159,8 @@ async fn test_empty_params_task_executes(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -196,8 +196,8 @@ async fn test_multi_step_task_completes_all_steps(pool: PgPool) -> sqlx::Result< let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -246,8 +246,8 @@ async fn test_multiple_tasks_execute_concurrently(pool: PgPool) -> sqlx::Result< // Start worker with concurrency > 1 let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 5, ..Default::default() }) @@ -285,8 +285,8 @@ async fn test_worker_concurrency_limit_respected(pool: PgPool) -> sqlx::Result<( // Start worker with low concurrency let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Only 2 at a time ..Default::default() }) @@ -323,8 +323,8 @@ async fn test_worker_graceful_shutdown_waits(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -394,8 +394,8 @@ async fn test_task_result_stored_correctly(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -430,8 +430,8 @@ async fn test_research_task_readme_example(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -478,8 +478,8 @@ async fn test_convenience_methods_execute(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -529,8 +529,8 @@ async fn test_multiple_convenience_calls_produce_different_values( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -572,8 +572,8 @@ async fn test_reserved_prefix_rejected(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -611,8 +611,8 @@ async fn test_reserved_prefix_error_payload(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -672,8 +672,8 @@ async fn test_long_running_task_with_heartbeat_completes(pool: PgPool) -> sqlx:: let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 1, // 1 second claim timeout - task runs for 3x this duration + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(1), // 1 second claim timeout - task runs for 3x this duration ..Default::default() }) .await; @@ -791,8 +791,8 @@ async fn test_task_uses_application_state(pool: PgPool) -> sqlx::Result<()> { // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; diff --git a/tests/fanout_test.rs b/tests/fanout_test.rs index ec039f9..dec6e7f 100644 --- a/tests/fanout_test.rs +++ b/tests/fanout_test.rs @@ -104,8 +104,8 @@ async fn test_spawn_single_child_and_join(pool: PgPool) -> sqlx::Result<()> { // Start worker with concurrency to handle both parent and child let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) @@ -152,8 +152,8 @@ async fn test_spawn_multiple_children_and_join(pool: PgPool) -> sqlx::Result<()> // Start worker with high concurrency let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 10, ..Default::default() }) @@ -202,8 +202,8 @@ async fn test_child_has_parent_task_id(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) @@ -265,8 +265,8 @@ async fn test_child_failure_propagates_to_parent(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 4, ..Default::default() }) @@ -305,8 +305,8 @@ async fn test_cascade_cancel_when_parent_cancelled(pool: PgPool) -> sqlx::Result // Start worker to let parent spawn child let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Process both parent and child ..Default::default() }) @@ -370,8 +370,8 @@ async fn test_spawn_by_name_from_task_context(pool: PgPool) -> sqlx::Result<()> // Start worker with concurrency to handle both parent and child let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) @@ -431,8 +431,8 @@ async fn test_join_cancelled_child_returns_child_cancelled_error(pool: PgPool) - let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Need concurrency for both parent and child ..Default::default() }) @@ -519,8 +519,8 @@ async fn test_child_failed_error_contains_message(pool: PgPool) -> sqlx::Result< let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 4, ..Default::default() }) @@ -599,8 +599,8 @@ async fn test_join_timeout_when_parent_claim_expires(pool: PgPool) -> sqlx::Resu // When the claim expires while suspended, the task should eventually timeout let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 2, // Very short - 2 seconds + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(2), // Very short - 2 seconds concurrency: 2, ..Default::default() }) diff --git a/tests/lease_test.rs b/tests/lease_test.rs index 9456429..16b7631 100644 --- a/tests/lease_test.rs +++ b/tests/lease_test.rs @@ -33,7 +33,7 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 30; // 30 seconds + let claim_timeout = Duration::from_secs(30); // 30 seconds // Spawn a task that will take a while (uses heartbeats) let spawn_result = client @@ -46,7 +46,7 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -66,13 +66,13 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { .expect("claim_expires_at should be set"); // Should be approximately start_time + claim_timeout seconds - let expected_expiry = start_time + chrono::Duration::seconds(claim_timeout as i64); + let expected_expiry = start_time + chrono::Duration::seconds(claim_timeout.as_secs() as i64); let diff = (claim_expires - expected_expiry).num_seconds().abs(); assert!( diff <= 2, "claim_expires_at should be ~{} seconds from start, got {} seconds diff", - claim_timeout, + claim_timeout.as_secs(), diff ); @@ -91,7 +91,7 @@ async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 10; // 10 seconds + let claim_timeout = Duration::from_secs(10); // 10 seconds // Spawn task that heartbeats frequently let spawn_result = client @@ -104,7 +104,7 @@ async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -165,7 +165,7 @@ async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 30; + let claim_timeout = Duration::from_secs(30); let num_steps = 20; // Enough steps to observe lease extension // Spawn task that creates many checkpoints @@ -176,7 +176,7 @@ async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -243,8 +243,8 @@ async fn test_heartbeat_detects_cancellation(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; diff --git a/tests/partition_test.rs b/tests/partition_test.rs index 4003940..051581c 100644 --- a/tests/partition_test.rs +++ b/tests/partition_test.rs @@ -36,7 +36,9 @@ async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result fail_after_step2: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -46,8 +48,8 @@ async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -89,7 +91,7 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // Short lease + let claim_timeout = Duration::from_secs(2); // Short lease // Spawn a slow task let spawn_result = client @@ -98,7 +100,9 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> sleep_ms: 30000, // 30 seconds }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(5), ..Default::default() }, @@ -109,7 +113,7 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> // First worker claims the task let worker1 = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) @@ -122,13 +126,13 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> worker1.shutdown().await; // Wait for lease to expire - tokio::time::sleep(Duration::from_secs(claim_timeout + 1)).await; + tokio::time::sleep(claim_timeout + Duration::from_secs(1)).await; // Second worker reclaims let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, // Longer timeout + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), // Longer timeout ..Default::default() }) .await; diff --git a/tests/retry_test.rs b/tests/retry_test.rs index 4c6decc..74e2422 100644 --- a/tests/retry_test.rs +++ b/tests/retry_test.rs @@ -45,8 +45,8 @@ async fn test_retry_strategy_none_no_retry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -87,7 +87,9 @@ async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { error_message: "intentional failure".to_string(), }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 5 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(5), + }), max_attempts: Some(2), ..Default::default() }, @@ -97,8 +99,8 @@ async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -164,9 +166,9 @@ async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<( }, SpawnOptions { retry_strategy: Some(RetryStrategy::Exponential { - base_seconds: 2, + base_delay: Duration::from_secs(2), factor: 2.0, - max_seconds: 100, + max_backoff: Duration::from_secs(100), }), max_attempts: Some(3), ..Default::default() @@ -177,8 +179,8 @@ async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -245,7 +247,9 @@ async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { error_message: "intentional failure".to_string(), }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -255,8 +259,8 @@ async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; diff --git a/tests/spawn_test.rs b/tests/spawn_test.rs index e261179..5a79dd2 100644 --- a/tests/spawn_test.rs +++ b/tests/spawn_test.rs @@ -6,6 +6,7 @@ use common::tasks::{EchoParams, EchoTask, FailingParams, FailingTask}; use durable::{CancellationPolicy, Durable, MIGRATOR, RetryStrategy, SpawnOptions}; use sqlx::PgPool; use std::collections::HashMap; +use std::time::Duration; /// Helper to create a Durable client from the test pool. async fn create_client(pool: PgPool, queue_name: &str) -> Durable { @@ -135,7 +136,9 @@ async fn test_spawn_with_retry_strategy_fixed(pool: PgPool) -> sqlx::Result<()> client.register::().await.unwrap(); let options = SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 10 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(10), + }), ..Default::default() }; @@ -162,9 +165,9 @@ async fn test_spawn_with_retry_strategy_exponential(pool: PgPool) -> sqlx::Resul let options = SpawnOptions { retry_strategy: Some(RetryStrategy::Exponential { - base_seconds: 5, + base_delay: Duration::from_secs(5), factor: 2.0, - max_seconds: 300, + max_backoff: Duration::from_secs(300), }), ..Default::default() }; @@ -222,8 +225,8 @@ async fn test_spawn_with_cancellation_policy(pool: PgPool) -> sqlx::Result<()> { let options = SpawnOptions { cancellation: Some(CancellationPolicy { - max_delay: Some(60), - max_duration: Some(300), + max_pending_time: Some(Duration::from_secs(60)), + max_running_time: Some(Duration::from_secs(300)), }), ..Default::default() }; @@ -279,7 +282,9 @@ async fn test_spawn_by_name_with_options(pool: PgPool) -> sqlx::Result<()> { let options = SpawnOptions { max_attempts: Some(3), - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 5 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(5), + }), ..Default::default() }; diff --git a/tests/telemetry_test.rs b/tests/telemetry_test.rs index f5a48ba..d1c430e 100644 --- a/tests/telemetry_test.rs +++ b/tests/telemetry_test.rs @@ -155,8 +155,8 @@ async fn test_task_lifecycle_metrics(pool: PgPool) -> sqlx::Result<()> { // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -240,8 +240,8 @@ async fn test_task_failure_metrics(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -284,8 +284,8 @@ async fn test_worker_gauge_metrics(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -352,8 +352,8 @@ async fn test_checkpoint_metrics(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; @@ -412,8 +412,8 @@ async fn test_task_execution_duration_metrics(pool: PgPool) -> sqlx::Result<()> let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) .await; From b95d1617dacb127d02b4436e4d9c9be9506b42cf Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Wed, 24 Dec 2025 11:20:33 -0500 Subject: [PATCH 2/4] prevent durations that are subsecond for claim timeout --- benches/checkpoint.rs | 12 ++++++++---- benches/concurrency.rs | 6 ++++-- benches/throughput.rs | 6 ++++-- src/client.rs | 17 ++++++++++++++--- src/context.rs | 10 +++++++++- src/error.rs | 9 +++++++++ tests/checkpoint_test.rs | 14 +++++++------- tests/concurrency_test.rs | 4 ++-- tests/crash_test.rs | 22 +++++++++++----------- tests/event_test.rs | 20 ++++++++++---------- tests/execution_test.rs | 30 +++++++++++++++--------------- tests/fanout_test.rs | 18 +++++++++--------- tests/lease_test.rs | 8 ++++---- tests/partition_test.rs | 6 +++--- tests/retry_test.rs | 8 ++++---- tests/telemetry_test.rs | 10 +++++----- 16 files changed, 118 insertions(+), 82 deletions(-) diff --git a/benches/checkpoint.rs b/benches/checkpoint.rs index 753900a..6591512 100644 --- a/benches/checkpoint.rs +++ b/benches/checkpoint.rs @@ -42,7 +42,8 @@ fn bench_step_cache_miss(c: &mut Criterion) { let worker = ctx .client .start_worker(bench_worker_options(1, Duration::from_secs(120))) - .await; + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; total_time += start.elapsed(); @@ -94,7 +95,8 @@ fn bench_step_cache_hit(c: &mut Criterion) { let worker = ctx .client .start_worker(bench_worker_options(1, Duration::from_secs(120))) - .await; + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; worker.shutdown().await; @@ -126,7 +128,8 @@ fn bench_step_cache_hit(c: &mut Criterion) { let worker = ctx .client .start_worker(bench_worker_options(1, Duration::from_secs(120))) - .await; + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; total_time += start.elapsed(); @@ -182,7 +185,8 @@ fn bench_large_payload_checkpoint(c: &mut Criterion) { let worker = ctx .client .start_worker(bench_worker_options(1, Duration::from_secs(120))) - .await; + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 120).await; total_time += start.elapsed(); diff --git a/benches/concurrency.rs b/benches/concurrency.rs index 1ff2cd3..2637605 100644 --- a/benches/concurrency.rs +++ b/benches/concurrency.rs @@ -62,7 +62,8 @@ fn bench_concurrent_claims(c: &mut Criterion) { 1, Duration::from_secs(60), )) - .await; + .await + .unwrap(); // Wait a bit then shutdown tokio::time::sleep(Duration::from_secs(15)).await; @@ -151,7 +152,8 @@ fn bench_claim_latency_distribution(c: &mut Criterion) { 4, Duration::from_secs(60), )) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_secs(20)).await; worker.shutdown().await; diff --git a/benches/throughput.rs b/benches/throughput.rs index af1d011..ef7d9a3 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -76,7 +76,8 @@ fn bench_task_throughput(c: &mut Criterion) { concurrency, Duration::from_secs(60), )) - .await; + .await + .unwrap(); wait_for_tasks_complete( &ctx.pool, @@ -118,7 +119,8 @@ fn bench_e2e_completion(c: &mut Criterion) { let worker = ctx .client .start_worker(bench_worker_options(1, Duration::from_secs(60))) - .await; + .await + .unwrap(); let start = std::time::Instant::now(); for i in 0..iters { diff --git a/src/client.rs b/src/client.rs index 70432a4..84042b9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ use sqlx::{Executor, PgPool, Postgres}; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use uuid::Uuid; @@ -599,15 +600,25 @@ where } /// Start a worker that processes tasks from the queue - pub async fn start_worker(&self, options: WorkerOptions) -> Worker { - Worker::start( + /// + /// # Errors + /// + /// Returns [`DurableError::InvalidConfiguration`] if `claim_timeout` is less than 1 second. + pub async fn start_worker(&self, options: WorkerOptions) -> DurableResult { + if options.claim_timeout < Duration::from_secs(1) { + return Err(DurableError::InvalidConfiguration { + reason: "claim_timeout must be at least 1 second".to_string(), + }); + } + + Ok(Worker::start( self.pool.clone(), self.queue_name.clone(), self.registry.clone(), options, self.state.clone(), ) - .await + .await) } /// Close the client. Closes the pool if owned. diff --git a/src/context.rs b/src/context.rs index b49eefa..3071119 100644 --- a/src/context.rs +++ b/src/context.rs @@ -442,9 +442,11 @@ where /// /// # Arguments /// * `duration` - Extension duration. Defaults to original claim_timeout. + /// Must be at least 1 second. /// /// # Errors - /// Returns `TaskError::Control(Cancelled)` if the task was cancelled. + /// - Returns `TaskError::Validation` if duration is less than 1 second. + /// - Returns `TaskError::Control(Cancelled)` if the task was cancelled. #[cfg_attr( feature = "telemetry", tracing::instrument( @@ -456,6 +458,12 @@ where pub async fn heartbeat(&self, duration: Option) -> TaskResult<()> { let extend_by = duration.unwrap_or(self.claim_timeout); + if extend_by < std::time::Duration::from_secs(1) { + return Err(TaskError::Validation { + message: "heartbeat duration must be at least 1 second".to_string(), + }); + } + let query = "SELECT durable.extend_claim($1, $2, $3)"; sqlx::query(query) .bind(&self.queue_name) diff --git a/src/error.rs b/src/error.rs index af57c9d..e7d7df4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -271,6 +271,15 @@ pub enum DurableError { /// The reason the event name was invalid. reason: String, }, + + /// Configuration validation failed. + /// + /// Returned when worker options contain invalid values. + #[error("invalid configuration: {reason}")] + InvalidConfiguration { + /// The reason the configuration was invalid. + reason: String, + }, } /// Result type alias for Client API operations. diff --git a/tests/checkpoint_test.rs b/tests/checkpoint_test.rs index 6977adc..d5fcc9f 100644 --- a/tests/checkpoint_test.rs +++ b/tests/checkpoint_test.rs @@ -59,7 +59,7 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to fail (will hit max attempts since fail_after_step2 is always true) let terminal = wait_for_task_terminal( @@ -100,7 +100,7 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); let terminal2 = wait_for_task_terminal( &pool, @@ -156,7 +156,7 @@ async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Resul claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -219,7 +219,7 @@ async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -279,7 +279,7 @@ async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Resu claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -328,7 +328,7 @@ async fn test_long_workflow_many_steps(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(60), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -380,7 +380,7 @@ async fn test_large_payload_checkpoint(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(60), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, diff --git a/tests/concurrency_test.rs b/tests/concurrency_test.rs index 6e6cc6a..d4411f3 100644 --- a/tests/concurrency_test.rs +++ b/tests/concurrency_test.rs @@ -62,7 +62,7 @@ async fn test_task_claimed_by_exactly_one_worker(pool: PgPool) -> sqlx::Result<( concurrency: 1, ..Default::default() }) - .await; + .await.unwrap(); // Let workers run for a bit tokio::time::sleep(Duration::from_millis(500)).await; @@ -161,7 +161,7 @@ async fn test_concurrent_claims_with_skip_locked(pool: PgPool) -> sqlx::Result<( concurrency: 5, // Each worker handles multiple tasks ..Default::default() }) - .await; + .await.unwrap(); // Let workers process tasks tokio::time::sleep(Duration::from_secs(3)).await; diff --git a/tests/crash_test.rs b/tests/crash_test.rs index db707c0..cf7f819 100644 --- a/tests/crash_test.rs +++ b/tests/crash_test.rs @@ -60,7 +60,7 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for some progress tokio::time::sleep(Duration::from_millis(500)).await; @@ -83,7 +83,7 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu claim_timeout: Duration::from_secs(5), // Short timeout to reclaim quickly ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to reach terminal state let terminal = wait_for_task_terminal( @@ -133,7 +133,7 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(500)).await; @@ -156,7 +156,7 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(60), // Longer timeout for second worker ..Default::default() }) - .await; + .await.unwrap(); // Give time for reclaim and some progress tokio::time::sleep(Duration::from_secs(2)).await; @@ -204,7 +204,7 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(200)).await; @@ -225,7 +225,7 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(60), // Longer timeout this time ..Default::default() }) - .await; + .await.unwrap(); // Give second worker time to reclaim tokio::time::sleep(Duration::from_millis(500)).await; @@ -267,7 +267,7 @@ async fn test_heartbeat_prevents_lease_expiration(pool: PgPool) -> sqlx::Result< claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to complete let terminal = wait_for_task_terminal( @@ -316,7 +316,7 @@ async fn test_spawn_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, // Handle parent and child ..Default::default() }) - .await; + .await.unwrap(); // Wait for parent to complete let terminal = wait_for_task_terminal( @@ -390,7 +390,7 @@ async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -457,7 +457,7 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to be claimed and start tokio::time::sleep(Duration::from_millis(200)).await; @@ -516,7 +516,7 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to be claimed and start sleeping tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/event_test.rs b/tests/event_test.rs index 9ab62b9..70af0c9 100644 --- a/tests/event_test.rs +++ b/tests/event_test.rs @@ -44,7 +44,7 @@ async fn test_emit_event_wakes_waiter(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start waiting tokio::time::sleep(Duration::from_millis(300)).await; @@ -117,7 +117,7 @@ async fn test_event_already_emitted_returns_immediately(pool: PgPool) -> sqlx::R claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Task should complete quickly since event exists let terminal = wait_for_task_terminal( @@ -174,7 +174,7 @@ async fn test_event_timeout_triggers(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to fail due to timeout let terminal = wait_for_task_terminal( @@ -231,7 +231,7 @@ async fn test_multiple_waiters_same_event(pool: PgPool) -> sqlx::Result<()> { concurrency: 3, ..Default::default() }) - .await; + .await.unwrap(); // Wait for all tasks to start waiting tokio::time::sleep(Duration::from_millis(500)).await; @@ -295,7 +295,7 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start waiting for event tokio::time::sleep(Duration::from_millis(300)).await; @@ -365,7 +365,7 @@ async fn test_event_last_write_wins(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -416,7 +416,7 @@ async fn test_multiple_distinct_events(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(300)).await; @@ -486,7 +486,7 @@ async fn test_event_write_does_not_propagate_after_wake(pool: PgPool) -> sqlx::R claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start waiting for event tokio::time::sleep(Duration::from_millis(300)).await; @@ -557,7 +557,7 @@ async fn test_emit_from_different_task(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, ..Default::default() }) - .await; + .await.unwrap(); // Wait for waiter to start tokio::time::sleep(Duration::from_millis(300)).await; @@ -731,7 +731,7 @@ async fn test_event_timeout_error_payload(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to fail due to timeout let terminal = wait_for_task_terminal( diff --git a/tests/execution_test.rs b/tests/execution_test.rs index e91f7f5..7466932 100644 --- a/tests/execution_test.rs +++ b/tests/execution_test.rs @@ -87,7 +87,7 @@ async fn test_simple_task_executes_and_completes(pool: PgPool) -> sqlx::Result<( claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; @@ -133,7 +133,7 @@ async fn test_task_state_transitions(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; @@ -163,7 +163,7 @@ async fn test_empty_params_task_executes(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -200,7 +200,7 @@ async fn test_multi_step_task_completes_all_steps(pool: PgPool) -> sqlx::Result< claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -251,7 +251,7 @@ async fn test_multiple_tasks_execute_concurrently(pool: PgPool) -> sqlx::Result< concurrency: 5, ..Default::default() }) - .await; + .await.unwrap(); // Wait for all tasks to complete tokio::time::sleep(Duration::from_millis(1000)).await; @@ -290,7 +290,7 @@ async fn test_worker_concurrency_limit_respected(pool: PgPool) -> sqlx::Result<( concurrency: 2, // Only 2 at a time ..Default::default() }) - .await; + .await.unwrap(); // Give some time for processing tokio::time::sleep(Duration::from_millis(2000)).await; @@ -327,7 +327,7 @@ async fn test_worker_graceful_shutdown_waits(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Very short wait, then shutdown tokio::time::sleep(Duration::from_millis(200)).await; @@ -398,7 +398,7 @@ async fn test_task_result_stored_correctly(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -434,7 +434,7 @@ async fn test_research_task_readme_example(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -482,7 +482,7 @@ async fn test_convenience_methods_execute(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -533,7 +533,7 @@ async fn test_multiple_convenience_calls_produce_different_values( claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -576,7 +576,7 @@ async fn test_reserved_prefix_rejected(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -615,7 +615,7 @@ async fn test_reserved_prefix_error_payload(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -676,7 +676,7 @@ async fn test_long_running_task_with_heartbeat_completes(pool: PgPool) -> sqlx:: claim_timeout: Duration::from_secs(1), // 1 second claim timeout - task runs for 3x this duration ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to complete (3 seconds + buffer) tokio::time::sleep(Duration::from_millis(4000)).await; @@ -795,7 +795,7 @@ async fn test_task_uses_application_state(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/fanout_test.rs b/tests/fanout_test.rs index dec6e7f..8e6bd9c 100644 --- a/tests/fanout_test.rs +++ b/tests/fanout_test.rs @@ -109,7 +109,7 @@ async fn test_spawn_single_child_and_join(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, ..Default::default() }) - .await; + .await.unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(2000)).await; @@ -157,7 +157,7 @@ async fn test_spawn_multiple_children_and_join(pool: PgPool) -> sqlx::Result<()> concurrency: 10, ..Default::default() }) - .await; + .await.unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(3000)).await; @@ -207,7 +207,7 @@ async fn test_child_has_parent_task_id(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, ..Default::default() }) - .await; + .await.unwrap(); tokio::time::sleep(Duration::from_millis(2000)).await; worker.shutdown().await; @@ -270,7 +270,7 @@ async fn test_child_failure_propagates_to_parent(pool: PgPool) -> sqlx::Result<( concurrency: 4, ..Default::default() }) - .await; + .await.unwrap(); // Wait for tasks to complete - longer since child needs to fail first, then parent tokio::time::sleep(Duration::from_millis(5000)).await; @@ -310,7 +310,7 @@ async fn test_cascade_cancel_when_parent_cancelled(pool: PgPool) -> sqlx::Result concurrency: 2, // Process both parent and child ..Default::default() }) - .await; + .await.unwrap(); // Give time for parent to spawn child and child to start tokio::time::sleep(Duration::from_millis(500)).await; @@ -375,7 +375,7 @@ async fn test_spawn_by_name_from_task_context(pool: PgPool) -> sqlx::Result<()> concurrency: 2, ..Default::default() }) - .await; + .await.unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(2000)).await; @@ -436,7 +436,7 @@ async fn test_join_cancelled_child_returns_child_cancelled_error(pool: PgPool) - concurrency: 2, // Need concurrency for both parent and child ..Default::default() }) - .await; + .await.unwrap(); // Wait for child to be spawned and start tokio::time::sleep(Duration::from_millis(500)).await; @@ -524,7 +524,7 @@ async fn test_child_failed_error_contains_message(pool: PgPool) -> sqlx::Result< concurrency: 4, ..Default::default() }) - .await; + .await.unwrap(); // Wait for parent to fail let terminal = wait_for_task_terminal( @@ -604,7 +604,7 @@ async fn test_join_timeout_when_parent_claim_expires(pool: PgPool) -> sqlx::Resu concurrency: 2, ..Default::default() }) - .await; + .await.unwrap(); // Wait for parent to fail (should timeout when claim expires) let terminal = wait_for_task_terminal( diff --git a/tests/lease_test.rs b/tests/lease_test.rs index 16b7631..557e498 100644 --- a/tests/lease_test.rs +++ b/tests/lease_test.rs @@ -50,7 +50,7 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for the task to be claimed tokio::time::sleep(Duration::from_millis(200)).await; @@ -108,7 +108,7 @@ async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to be claimed tokio::time::sleep(Duration::from_millis(200)).await; @@ -180,7 +180,7 @@ async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start running tokio::time::sleep(Duration::from_millis(200)).await; @@ -247,7 +247,7 @@ async fn test_heartbeat_detects_cancellation(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to start executing tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/partition_test.rs b/tests/partition_test.rs index 051581c..8a550cf 100644 --- a/tests/partition_test.rs +++ b/tests/partition_test.rs @@ -52,7 +52,7 @@ async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to fail (will retry but always fail after step 2) let terminal = wait_for_task_terminal( @@ -117,7 +117,7 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> claim_timeout, ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to be claimed tokio::time::sleep(Duration::from_millis(500)).await; @@ -135,7 +135,7 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(60), // Longer timeout ..Default::default() }) - .await; + .await.unwrap(); // Wait for reclaim to happen tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/tests/retry_test.rs b/tests/retry_test.rs index 74e2422..fe692b1 100644 --- a/tests/retry_test.rs +++ b/tests/retry_test.rs @@ -49,7 +49,7 @@ async fn test_retry_strategy_none_no_retry(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -103,7 +103,7 @@ async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for first attempt to fail tokio::time::sleep(Duration::from_millis(300)).await; @@ -183,7 +183,7 @@ async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<( claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for first attempt to fail tokio::time::sleep(Duration::from_millis(300)).await; @@ -263,7 +263,7 @@ async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for all attempts to complete let terminal = wait_for_task_terminal( diff --git a/tests/telemetry_test.rs b/tests/telemetry_test.rs index d1c430e..b722d8d 100644 --- a/tests/telemetry_test.rs +++ b/tests/telemetry_test.rs @@ -159,7 +159,7 @@ async fn test_task_lifecycle_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to complete wait_for_task_state( @@ -244,7 +244,7 @@ async fn test_task_failure_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Wait for task to fail wait_for_task_state( @@ -288,7 +288,7 @@ async fn test_worker_gauge_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); // Give the worker time to set its active gauge tokio::time::sleep(Duration::from_millis(200)).await; @@ -356,7 +356,7 @@ async fn test_checkpoint_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); wait_for_task_state( &pool, @@ -416,7 +416,7 @@ async fn test_task_execution_duration_metrics(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await.unwrap(); wait_for_task_state( &pool, From aaa77d9c460480eaff73f867780ef6803651a6af Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Wed, 24 Dec 2025 11:29:49 -0500 Subject: [PATCH 3/4] formatted --- tests/checkpoint_test.rs | 21 ++++++++++++------ tests/concurrency_test.rs | 6 ++++-- tests/crash_test.rs | 33 ++++++++++++++++++---------- tests/event_test.rs | 30 +++++++++++++++++--------- tests/execution_test.rs | 45 ++++++++++++++++++++++++++------------- tests/fanout_test.rs | 27 +++++++++++++++-------- tests/lease_test.rs | 12 +++++++---- tests/partition_test.rs | 9 +++++--- tests/retry_test.rs | 12 +++++++---- tests/telemetry_test.rs | 15 ++++++++----- 10 files changed, 140 insertions(+), 70 deletions(-) diff --git a/tests/checkpoint_test.rs b/tests/checkpoint_test.rs index d5fcc9f..661902a 100644 --- a/tests/checkpoint_test.rs +++ b/tests/checkpoint_test.rs @@ -59,7 +59,8 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to fail (will hit max attempts since fail_after_step2 is always true) let terminal = wait_for_task_terminal( @@ -100,7 +101,8 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal2 = wait_for_task_terminal( &pool, @@ -156,7 +158,8 @@ async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Resul claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -219,7 +222,8 @@ async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -279,7 +283,8 @@ async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Resu claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -328,7 +333,8 @@ async fn test_long_workflow_many_steps(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(60), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -380,7 +386,8 @@ async fn test_large_payload_checkpoint(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(60), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, diff --git a/tests/concurrency_test.rs b/tests/concurrency_test.rs index d4411f3..5c430bf 100644 --- a/tests/concurrency_test.rs +++ b/tests/concurrency_test.rs @@ -62,7 +62,8 @@ async fn test_task_claimed_by_exactly_one_worker(pool: PgPool) -> sqlx::Result<( concurrency: 1, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Let workers run for a bit tokio::time::sleep(Duration::from_millis(500)).await; @@ -161,7 +162,8 @@ async fn test_concurrent_claims_with_skip_locked(pool: PgPool) -> sqlx::Result<( concurrency: 5, // Each worker handles multiple tasks ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Let workers process tasks tokio::time::sleep(Duration::from_secs(3)).await; diff --git a/tests/crash_test.rs b/tests/crash_test.rs index cf7f819..bedca5d 100644 --- a/tests/crash_test.rs +++ b/tests/crash_test.rs @@ -60,7 +60,8 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for some progress tokio::time::sleep(Duration::from_millis(500)).await; @@ -83,7 +84,8 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu claim_timeout: Duration::from_secs(5), // Short timeout to reclaim quickly ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to reach terminal state let terminal = wait_for_task_terminal( @@ -133,7 +135,8 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(500)).await; @@ -156,7 +159,8 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(60), // Longer timeout for second worker ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Give time for reclaim and some progress tokio::time::sleep(Duration::from_secs(2)).await; @@ -204,7 +208,8 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(200)).await; @@ -225,7 +230,8 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(60), // Longer timeout this time ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Give second worker time to reclaim tokio::time::sleep(Duration::from_millis(500)).await; @@ -267,7 +273,8 @@ async fn test_heartbeat_prevents_lease_expiration(pool: PgPool) -> sqlx::Result< claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to complete let terminal = wait_for_task_terminal( @@ -316,7 +323,8 @@ async fn test_spawn_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, // Handle parent and child ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for parent to complete let terminal = wait_for_task_terminal( @@ -390,7 +398,8 @@ async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -457,7 +466,8 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to be claimed and start tokio::time::sleep(Duration::from_millis(200)).await; @@ -516,7 +526,8 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to be claimed and start sleeping tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/event_test.rs b/tests/event_test.rs index 70af0c9..dd198de 100644 --- a/tests/event_test.rs +++ b/tests/event_test.rs @@ -44,7 +44,8 @@ async fn test_emit_event_wakes_waiter(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start waiting tokio::time::sleep(Duration::from_millis(300)).await; @@ -117,7 +118,8 @@ async fn test_event_already_emitted_returns_immediately(pool: PgPool) -> sqlx::R claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Task should complete quickly since event exists let terminal = wait_for_task_terminal( @@ -174,7 +176,8 @@ async fn test_event_timeout_triggers(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to fail due to timeout let terminal = wait_for_task_terminal( @@ -231,7 +234,8 @@ async fn test_multiple_waiters_same_event(pool: PgPool) -> sqlx::Result<()> { concurrency: 3, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for all tasks to start waiting tokio::time::sleep(Duration::from_millis(500)).await; @@ -295,7 +299,8 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start waiting for event tokio::time::sleep(Duration::from_millis(300)).await; @@ -365,7 +370,8 @@ async fn test_event_last_write_wins(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -416,7 +422,8 @@ async fn test_multiple_distinct_events(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(300)).await; @@ -486,7 +493,8 @@ async fn test_event_write_does_not_propagate_after_wake(pool: PgPool) -> sqlx::R claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start waiting for event tokio::time::sleep(Duration::from_millis(300)).await; @@ -557,7 +565,8 @@ async fn test_emit_from_different_task(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for waiter to start tokio::time::sleep(Duration::from_millis(300)).await; @@ -731,7 +740,8 @@ async fn test_event_timeout_error_payload(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to fail due to timeout let terminal = wait_for_task_terminal( diff --git a/tests/execution_test.rs b/tests/execution_test.rs index 7466932..876b2e5 100644 --- a/tests/execution_test.rs +++ b/tests/execution_test.rs @@ -87,7 +87,8 @@ async fn test_simple_task_executes_and_completes(pool: PgPool) -> sqlx::Result<( claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; @@ -133,7 +134,8 @@ async fn test_task_state_transitions(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; @@ -163,7 +165,8 @@ async fn test_empty_params_task_executes(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -200,7 +203,8 @@ async fn test_multi_step_task_completes_all_steps(pool: PgPool) -> sqlx::Result< claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -251,7 +255,8 @@ async fn test_multiple_tasks_execute_concurrently(pool: PgPool) -> sqlx::Result< concurrency: 5, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for all tasks to complete tokio::time::sleep(Duration::from_millis(1000)).await; @@ -290,7 +295,8 @@ async fn test_worker_concurrency_limit_respected(pool: PgPool) -> sqlx::Result<( concurrency: 2, // Only 2 at a time ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Give some time for processing tokio::time::sleep(Duration::from_millis(2000)).await; @@ -327,7 +333,8 @@ async fn test_worker_graceful_shutdown_waits(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Very short wait, then shutdown tokio::time::sleep(Duration::from_millis(200)).await; @@ -398,7 +405,8 @@ async fn test_task_result_stored_correctly(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -434,7 +442,8 @@ async fn test_research_task_readme_example(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -482,7 +491,8 @@ async fn test_convenience_methods_execute(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -533,7 +543,8 @@ async fn test_multiple_convenience_calls_produce_different_values( claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -576,7 +587,8 @@ async fn test_reserved_prefix_rejected(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -615,7 +627,8 @@ async fn test_reserved_prefix_error_payload(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -676,7 +689,8 @@ async fn test_long_running_task_with_heartbeat_completes(pool: PgPool) -> sqlx:: claim_timeout: Duration::from_secs(1), // 1 second claim timeout - task runs for 3x this duration ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to complete (3 seconds + buffer) tokio::time::sleep(Duration::from_millis(4000)).await; @@ -795,7 +809,8 @@ async fn test_task_uses_application_state(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/fanout_test.rs b/tests/fanout_test.rs index 8e6bd9c..d5e7ddf 100644 --- a/tests/fanout_test.rs +++ b/tests/fanout_test.rs @@ -109,7 +109,8 @@ async fn test_spawn_single_child_and_join(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(2000)).await; @@ -157,7 +158,8 @@ async fn test_spawn_multiple_children_and_join(pool: PgPool) -> sqlx::Result<()> concurrency: 10, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(3000)).await; @@ -207,7 +209,8 @@ async fn test_child_has_parent_task_id(pool: PgPool) -> sqlx::Result<()> { concurrency: 2, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(2000)).await; worker.shutdown().await; @@ -270,7 +273,8 @@ async fn test_child_failure_propagates_to_parent(pool: PgPool) -> sqlx::Result<( concurrency: 4, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for tasks to complete - longer since child needs to fail first, then parent tokio::time::sleep(Duration::from_millis(5000)).await; @@ -310,7 +314,8 @@ async fn test_cascade_cancel_when_parent_cancelled(pool: PgPool) -> sqlx::Result concurrency: 2, // Process both parent and child ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Give time for parent to spawn child and child to start tokio::time::sleep(Duration::from_millis(500)).await; @@ -375,7 +380,8 @@ async fn test_spawn_by_name_from_task_context(pool: PgPool) -> sqlx::Result<()> concurrency: 2, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(2000)).await; @@ -436,7 +442,8 @@ async fn test_join_cancelled_child_returns_child_cancelled_error(pool: PgPool) - concurrency: 2, // Need concurrency for both parent and child ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for child to be spawned and start tokio::time::sleep(Duration::from_millis(500)).await; @@ -524,7 +531,8 @@ async fn test_child_failed_error_contains_message(pool: PgPool) -> sqlx::Result< concurrency: 4, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for parent to fail let terminal = wait_for_task_terminal( @@ -604,7 +612,8 @@ async fn test_join_timeout_when_parent_claim_expires(pool: PgPool) -> sqlx::Resu concurrency: 2, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for parent to fail (should timeout when claim expires) let terminal = wait_for_task_terminal( diff --git a/tests/lease_test.rs b/tests/lease_test.rs index 557e498..3cbd81b 100644 --- a/tests/lease_test.rs +++ b/tests/lease_test.rs @@ -50,7 +50,8 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for the task to be claimed tokio::time::sleep(Duration::from_millis(200)).await; @@ -108,7 +109,8 @@ async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to be claimed tokio::time::sleep(Duration::from_millis(200)).await; @@ -180,7 +182,8 @@ async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start running tokio::time::sleep(Duration::from_millis(200)).await; @@ -247,7 +250,8 @@ async fn test_heartbeat_detects_cancellation(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to start executing tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/partition_test.rs b/tests/partition_test.rs index 8a550cf..e02abfc 100644 --- a/tests/partition_test.rs +++ b/tests/partition_test.rs @@ -52,7 +52,8 @@ async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to fail (will retry but always fail after step 2) let terminal = wait_for_task_terminal( @@ -117,7 +118,8 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> claim_timeout, ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to be claimed tokio::time::sleep(Duration::from_millis(500)).await; @@ -135,7 +137,8 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(60), // Longer timeout ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for reclaim to happen tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/tests/retry_test.rs b/tests/retry_test.rs index fe692b1..fbc2ca8 100644 --- a/tests/retry_test.rs +++ b/tests/retry_test.rs @@ -49,7 +49,8 @@ async fn test_retry_strategy_none_no_retry(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -103,7 +104,8 @@ async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for first attempt to fail tokio::time::sleep(Duration::from_millis(300)).await; @@ -183,7 +185,8 @@ async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<( claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for first attempt to fail tokio::time::sleep(Duration::from_millis(300)).await; @@ -263,7 +266,8 @@ async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for all attempts to complete let terminal = wait_for_task_terminal( diff --git a/tests/telemetry_test.rs b/tests/telemetry_test.rs index b722d8d..9971624 100644 --- a/tests/telemetry_test.rs +++ b/tests/telemetry_test.rs @@ -159,7 +159,8 @@ async fn test_task_lifecycle_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to complete wait_for_task_state( @@ -244,7 +245,8 @@ async fn test_task_failure_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Wait for task to fail wait_for_task_state( @@ -288,7 +290,8 @@ async fn test_worker_gauge_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); // Give the worker time to set its active gauge tokio::time::sleep(Duration::from_millis(200)).await; @@ -356,7 +359,8 @@ async fn test_checkpoint_metrics(pool: PgPool) -> sqlx::Result<()> { claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); wait_for_task_state( &pool, @@ -416,7 +420,8 @@ async fn test_task_execution_duration_metrics(pool: PgPool) -> sqlx::Result<()> claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await.unwrap(); + .await + .unwrap(); wait_for_task_state( &pool, From d66f384895f76012b282983c2738760447723eac Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Wed, 24 Dec 2025 11:48:48 -0500 Subject: [PATCH 4/4] fixed semantic conflicts --- tests/event_test.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/event_test.rs b/tests/event_test.rs index 7a193f6..6eb9e20 100644 --- a/tests/event_test.rs +++ b/tests/event_test.rs @@ -865,12 +865,13 @@ async fn test_event_race_stress(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.01, - claim_timeout: 60, + poll_interval: Duration::from_millis(10), + claim_timeout: Duration::from_secs(60), concurrency: 32, ..Default::default() }) - .await; + .await + .unwrap(); for round in 0..rounds { let mut task_ids = Vec::with_capacity(tasks_per_round);