Skip to content

feat(job): cooperative job cancellation via unified NOTIFY router#61

Closed
bodymindarts wants to merge 6 commits intomainfrom
feat/cooperative-cancel-job
Closed

feat(job): cooperative job cancellation via unified NOTIFY router#61
bodymindarts wants to merge 6 commits intomainfrom
feat/cooperative-cancel-job

Conversation

@bodymindarts
Copy link
Copy Markdown
Member

Summary

Implements cooperative job cancellation that works for both pending and running jobs, building on the unified PG NOTIFY router from PR #59.

  • Pending jobs: cancelled synchronously by deleting the execution row and recording cancel events in a single transaction
  • Running jobs: cancelled via job_cancel notification through the existing job_events PG NOTIFY channel → CancellationToken → runner observes and returns JobCompletion::Cancelled
  • Force cancel: monitor task aborts the JoinHandle after cancel_timeout (default 30s) if the runner doesn't cooperate
  • Safety net: keep-alive sweep detects missed NOTIFY signals and triggers tokens for jobs with cancelled_at set

Key changes

  • DB trigger on cancelled_at column fires job_cancel notification through unified router
  • CancellationTokens store (DashMap<JobId, CancellationToken>) for running jobs
  • CurrentJob exposes cancellation_requested() / cancellation_notified() for runners
  • JobCompletion::Cancelled variant and cancel_running_job dispatcher method
  • CancelResult enum (Cancelled | AlreadyCompleted | NotFound) for cancel_job() API
  • cancel_timeout config option (default 30s)

Note: This PR is stacked on #59 (await-completion v2) and includes those commits.

Test plan

  • test_cancel_pending_job — cancels a pending job, asserts CancelResult::Cancelled
  • test_cancel_running_job_succeeds — cancels a running job cooperatively, asserts CancelResult::Cancelled
  • test_cancel_already_completed_job_is_idempotent — cancels a completed job, asserts CancelResult::AlreadyCompleted
  • All 40 tests pass (including 4 await_completion tests)
  • nix flake check passes (fmt, clippy --deny warnings, audit, deny)

🤖 Generated with Claude Code

@bodymindarts bodymindarts force-pushed the feat/cooperative-cancel-job branch from 2a8188b to 2896a1d Compare March 17, 2026 02:15
@bodymindarts bodymindarts marked this pull request as draft March 17, 2026 11:51
@bodymindarts bodymindarts force-pushed the feat/cooperative-cancel-job branch from 2896a1d to 7dd6434 Compare March 17, 2026 12:28
bodymindarts and others added 6 commits March 17, 2026 14:28
Allow job runners to attach a result value that callers receive through
await_completion. The result flows from runner → OnceLock → entity event →
JobCompletionResult without requiring any new migrations (backward-compatible
serde(default) on the JobCompleted event variant).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The new await_completion tests registered pollers for the shared
"test-job" type. Since nextest runs each test as a separate process
sharing the same Postgres database, pollers from different processes
competed for the same jobs. When a process exited before completing a
stolen job (tokio runtime drops, cancelling the shutdown task), the job
was left orphaned in 'running' state — causing
test_cancel_already_completed_job_is_idempotent to time out waiting for
its job to complete.

Fix: give each await_completion test its own job type via a new
AwaitTestJobInitializer so cross-process pollers never interfere.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ss poller interference

Each test now gets its own unique job type via TestJobInitializer so that
nextest parallel processes don't steal each other's jobs from the shared
database. This extends the await_completion fix to all remaining tests
that shared the "test-job" type, which caused flaky timeouts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace OnceLock with Mutex<Option> so callers can call set_result
multiple times. The last value set before completion or error is
persisted — enabling incremental progress tracking in batch jobs.
Partial results are preserved on error so callers can see how far
a job got before failing.

- Remove ResultAlreadySet error variant (multiple calls now allowed)
- Update dispatcher to use Mutex-based result holder
- Add tests for incremental set_result and partial progress on error

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add cancel_job() API that cooperatively cancels both pending and running
jobs. Pending jobs are cancelled synchronously by deleting the execution
row and recording cancel events. Running jobs are cancelled via a
PG NOTIFY signal through the existing unified job_events channel, which
triggers a tokio-util CancellationToken that the runner can observe.

Key changes:
- DB trigger fires job_cancel notification when cancelled_at transitions
  from NULL to non-NULL
- CancellationTokens store (DashMap) maps running job IDs to tokens
- NotificationRouter routes job_cancel events to cancel tokens
- CurrentJob exposes cancellation_requested() / cancellation_notified()
- JobCompletion::Cancelled variant for cooperative cancellation
- Force-cancel monitor aborts JoinHandle after cancel_timeout (default 30s)
- Keep-alive sweep as safety net for missed NOTIFY signals
- CancelResult enum: Cancelled | AlreadyCompleted | NotFound

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@bodymindarts bodymindarts force-pushed the feat/cooperative-cancel-job branch from 7dd6434 to f44b140 Compare March 17, 2026 13:50
@bodymindarts
Copy link
Copy Markdown
Member Author

Closing: reverting the cancel_job approach. Command-center will handle cancellation at its own layer instead of relying on upstream job crate cancel API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant