Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,45 +133,6 @@ impl JobDispatcher {
self.reschedule_job(&mut tx, job.id, t).await?;
tx.commit().await?;
}
Ok(JobCompletion::RescheduleIn(d)) => {
span.record("conclusion", "RescheduleIn");
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
let t = op.maybe_now().unwrap_or_else(|| self.clock.now());
let t = t + d;
self.reschedule_job(&mut op, job.id, t).await?;
op.commit().await?;
}
#[cfg(feature = "es-entity")]
Ok(JobCompletion::RescheduleInWithOp(mut op, d)) => {
span.record("conclusion", "RescheduleInWithOp");
let t = op.maybe_now().unwrap_or_else(|| self.clock.now());
let t = t + d;
self.reschedule_job(&mut op, job.id, t).await?;
op.commit().await?;
}
Ok(JobCompletion::RescheduleInWithTx(mut tx, d)) => {
span.record("conclusion", "RescheduleInWithOp");
let t = self.clock.now() + d;
self.reschedule_job(&mut tx, job.id, t).await?;
tx.commit().await?;
}
Ok(JobCompletion::RescheduleAt(t)) => {
span.record("conclusion", "RescheduleAt");
let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
self.reschedule_job(&mut op, job.id, t).await?;
op.commit().await?;
}
#[cfg(feature = "es-entity")]
Ok(JobCompletion::RescheduleAtWithOp(mut op, t)) => {
span.record("conclusion", "RescheduleAtWithOp");
self.reschedule_job(&mut op, job.id, t).await?;
op.commit().await?;
}
Ok(JobCompletion::RescheduleAtWithTx(mut tx, t)) => {
span.record("conclusion", "RescheduleAtWithTx");
self.reschedule_job(&mut tx, job.id, t).await?;
tx.commit().await?;
}
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
//! Jobs run immediately once a poller claims them. If you need a future start
//! time, schedule it up front with [`JobSpawner::spawn_at_in_op`]. After a
//! run completes, return [`JobCompletion::Complete`] for one-off work or use the
//! `JobCompletion::Reschedule*` variants to book the next execution.
//! `JobCompletion::RescheduleNow*` variants to book the next execution.
//!
//! ## Retries
//!
Expand Down
18 changes: 0 additions & 18 deletions src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Traits and types used when defining job logic.

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Serialize, de::DeserializeOwned};

use super::{
Expand Down Expand Up @@ -64,23 +63,6 @@ pub enum JobCompletion {
RescheduleNowWithOp(es_entity::DbOp<'static>),
/// Schedule a new run immediately and return a transaction that the job service will commit.
RescheduleNowWithTx(sqlx::Transaction<'static, sqlx::Postgres>),
/// Schedule the next run after a delay.
RescheduleIn(std::time::Duration),
#[cfg(feature = "es-entity")]
/// Schedule the next run after a delay and return an `EsEntity` operation that the job service will commit.
RescheduleInWithOp(es_entity::DbOp<'static>, std::time::Duration),
/// Schedule the next run after a delay and return a transaction that the job service will commit.
RescheduleInWithTx(
sqlx::Transaction<'static, sqlx::Postgres>,
std::time::Duration,
),
/// Schedule the next run at an exact timestamp.
RescheduleAt(DateTime<Utc>),
#[cfg(feature = "es-entity")]
/// Schedule the next run at an exact timestamp and return an `EsEntity` operation that the job service will commit.
RescheduleAtWithOp(es_entity::DbOp<'static>, DateTime<Utc>),
/// Schedule the next run at an exact timestamp and return a transaction that the job service will commit.
RescheduleAtWithTx(sqlx::Transaction<'static, sqlx::Postgres>, DateTime<Utc>),
}

#[async_trait]
Expand Down
Loading