Skip to content

Commit 8c4dcc5

Browse files
mihowclaude
andauthored
fix: set stale jobs to "revoked" status instead of "pending" (#1169)
* fix: revoke stale jobs by default instead of setting PENDING update_stale_jobs previously checked Celery for task state, but AsyncResult returns PENDING for tasks it has no record of. This caused stale jobs to cycle through PENDING endlessly, and async_api jobs kept serving tasks to workers via the /tasks/ endpoint. Now: only trust Celery when it reports a known state (SUCCESS, FAILURE, etc). Otherwise revoke the job and clean up NATS/Redis resources. Also adds --dry-run flag. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: extract check_stale_jobs() for reuse by periodic task Move core stale-job logic from management command into check_stale_jobs() in tasks.py. The management command is now a thin wrapper. Add tests for the extracted function. This prepares for #1025 which will call check_stale_jobs() from a Celery Beat periodic task. Co-Authored-By: Claude <noreply@anthropic.com> * refactor: move jobs tests into tests/ package Consolidate ami/jobs/tests.py, test_tasks.py, and tests_update_stale_jobs.py into an ami/jobs/tests/ package with consistent test_ prefixes. All files are now discovered by Django's default test runner pattern. Co-Authored-By: Claude <noreply@anthropic.com> * fix: correct stale job handling in check_stale_jobs() Four issues fixed: - Use states.READY_STATES instead of ALL_STATES - {PENDING} so non-terminal Celery states (STARTED, RETRY, RECEIVED) don't leave jobs stuck - Guard SUCCESS: only accept it when job.progress.is_complete(), matching the existing check in update_job_status(); otherwise revoke the job - Set finished_at in both the "updated" and "revoked" branches - Capture previous_status before calling update_status() so the result dict reflects the original state rather than the post-mutation REVOKED value Tests added for the updated-from-Celery-state and SUCCESS-with-incomplete- progress paths. Co-Authored-By: Claude <noreply@anthropic.com> * fix: extend async_api progress guard to FAILURE, add cleanup to terminal branch Two issues from code review: - Guard FAILURE the same as SUCCESS for async_api jobs: if Celery reports SUCCESS or FAILURE but progress is incomplete, treat as non-terminal and revoke instead, matching the AsyncJobStateManager convention - Call cleanup_async_job_if_needed() in the is_terminal branch so NATS/Redis resources are freed for recovered jobs, not only for revoked ones Co-Authored-By: Claude <noreply@anthropic.com> * fix: use select_for_update to prevent concurrent stale-job processing Fetch stale job PKs first, then re-acquire each inside transaction.atomic() with select_for_update(). The re-fetch re-checks running state and cutoff so a job handled by a concurrent run is skipped (DoesNotExist) rather than double-processed. Async resource cleanup (NATS/Redis) runs outside the transaction to avoid holding the row lock during network calls. Matches the pattern used by _fail_job() and _update_job_progress(). Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch Celery backend errors in stale-job cleanup loop Wrap AsyncResult(task_id).state in try-except so a single broker/backend failure doesn't abort the entire batch. Failed lookups are logged and the job is revoked as if Celery state were unknown. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5283e23 commit 8c4dcc5

6 files changed

Lines changed: 235 additions & 22 deletions

File tree

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,37 @@
1-
from celery import states
2-
from celery.result import AsyncResult
31
from django.core.management.base import BaseCommand
4-
from django.utils import timezone
52

6-
from ami.jobs.models import Job, JobState
3+
from ami.jobs.models import Job
4+
from ami.jobs.tasks import check_stale_jobs
75

86

97
class Command(BaseCommand):
10-
help = (
11-
"Update the status of all jobs that are not in a final state " "and have not been updated in the last X hours."
12-
)
8+
help = "Revoke stale jobs that have not been updated within the cutoff period."
139

14-
# Add argument for the number of hours to consider a job stale
1510
def add_arguments(self, parser):
1611
parser.add_argument(
1712
"--hours",
1813
type=int,
1914
default=Job.FAILED_CUTOFF_HOURS,
20-
help="Number of hours to consider a job stale",
15+
help="Number of hours to consider a job stale (default: %(default)s)",
16+
)
17+
parser.add_argument(
18+
"--dry-run",
19+
action="store_true",
20+
help="Show what would be done without making changes",
2121
)
2222

2323
def handle(self, *args, **options):
24-
stale_jobs = Job.objects.filter(
25-
status__in=JobState.running_states(),
26-
updated_at__lt=timezone.now() - timezone.timedelta(hours=options["hours"]),
27-
)
24+
results = check_stale_jobs(hours=options["hours"], dry_run=options["dry_run"])
25+
26+
if not results:
27+
self.stdout.write("No stale jobs found.")
28+
return
2829

29-
for job in stale_jobs:
30-
task = AsyncResult(job.task_id) if job.task_id else None
31-
if task:
32-
job.update_status(task.state, save=False)
33-
job.save()
34-
self.stdout.write(self.style.SUCCESS(f"Updated status of job {job.pk} to {task.state}"))
30+
prefix = "[dry-run] " if options["dry_run"] else ""
31+
for r in results:
32+
if r["action"] == "updated":
33+
self.stdout.write(
34+
self.style.SUCCESS(f"{prefix}Job {r['job_id']}: updated to {r['state']} (from Celery)")
35+
)
3536
else:
36-
self.stdout.write(self.style.WARNING(f"Job {job.pk} has no associated task, setting status to FAILED"))
37-
job.update_status(states.FAILURE, save=False)
38-
job.save()
37+
self.stdout.write(self.style.WARNING(f"{prefix}Job {r['job_id']}: revoked (no known Celery state)"))

ami/jobs/tasks.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,97 @@ def _update_job_progress(
316316
cleanup_async_job_if_needed(job)
317317

318318

319+
def check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[dict]:
320+
"""
321+
Find jobs stuck in a running state past the cutoff and revoke them.
322+
323+
For each stale job, checks Celery for a terminal task status. REVOKED is
324+
always trusted. For async_api jobs, SUCCESS and FAILURE are only accepted
325+
when job.progress.is_complete() — NATS workers may still be delivering
326+
results after the Celery task finishes. All other cases result in revocation.
327+
Async resources (NATS/Redis) are cleaned up in both branches.
328+
329+
Returns a list of dicts describing what was done to each job.
330+
"""
331+
import datetime
332+
333+
from celery import states
334+
from celery.result import AsyncResult
335+
from django.db import transaction
336+
337+
from ami.jobs.models import Job, JobDispatchMode, JobState
338+
339+
if hours is None:
340+
hours = Job.FAILED_CUTOFF_HOURS
341+
342+
cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours)
343+
stale_pks = list(
344+
Job.objects.filter(
345+
status__in=JobState.running_states(),
346+
updated_at__lt=cutoff,
347+
).values_list("pk", flat=True)
348+
)
349+
350+
results = []
351+
for pk in stale_pks:
352+
with transaction.atomic():
353+
try:
354+
job = Job.objects.select_for_update().get(
355+
pk=pk,
356+
status__in=JobState.running_states(),
357+
updated_at__lt=cutoff,
358+
)
359+
except Job.DoesNotExist:
360+
# Another concurrent run already handled this job.
361+
continue
362+
363+
celery_state = None
364+
if job.task_id:
365+
try:
366+
celery_state = AsyncResult(job.task_id).state
367+
except Exception:
368+
logger.warning(
369+
"Failed to fetch Celery state for stale job %s (task_id=%s)",
370+
job.pk,
371+
job.task_id,
372+
exc_info=True,
373+
)
374+
# Treat as unknown state — job will be revoked below.
375+
376+
# Only trust terminal Celery states. For async_api jobs, SUCCESS and
377+
# FAILURE are only accepted when progress is complete — NATS workers may
378+
# still be delivering results after the Celery task finishes.
379+
is_terminal = celery_state in states.READY_STATES
380+
is_async_api = job.dispatch_mode == JobDispatchMode.ASYNC_API
381+
if is_async_api and celery_state in {states.SUCCESS, states.FAILURE} and not job.progress.is_complete():
382+
is_terminal = False
383+
384+
previous_status = job.status
385+
if is_terminal:
386+
if not dry_run:
387+
job.update_status(celery_state, save=False)
388+
job.finished_at = datetime.datetime.now()
389+
job.save()
390+
else:
391+
if not dry_run:
392+
job.update_status(JobState.REVOKED, save=False)
393+
job.finished_at = datetime.datetime.now()
394+
job.save()
395+
396+
# Async resource cleanup runs outside the transaction — it makes network
397+
# calls (NATS/Redis) that should not hold the DB row lock.
398+
if not dry_run:
399+
job.refresh_from_db()
400+
cleanup_async_job_if_needed(job)
401+
402+
if is_terminal:
403+
results.append({"job_id": job.pk, "action": "updated", "state": celery_state})
404+
else:
405+
results.append({"job_id": job.pk, "action": "revoked", "previous_status": previous_status})
406+
407+
return results
408+
409+
319410
def cleanup_async_job_if_needed(job) -> None:
320411
"""
321412
Clean up async resources (NATS/Redis) if this job uses them.

ami/jobs/tests/__init__.py

Whitespace-only changes.
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
from datetime import timedelta
2+
from unittest.mock import patch
3+
4+
from django.test import TestCase
5+
from django.utils import timezone
6+
7+
from ami.jobs.models import Job, JobDispatchMode, JobState
8+
from ami.jobs.tasks import check_stale_jobs
9+
from ami.main.models import Project
10+
11+
12+
class CheckStaleJobsTest(TestCase):
13+
def setUp(self):
14+
self.project = Project.objects.create(name="Stale jobs test project")
15+
16+
def _create_job(self, status=JobState.STARTED, hours_ago=100, task_id=None):
17+
job = Job.objects.create(
18+
project=self.project,
19+
name=f"Test job {status}",
20+
status=status,
21+
)
22+
Job.objects.filter(pk=job.pk).update(
23+
updated_at=timezone.now() - timedelta(hours=hours_ago),
24+
)
25+
if task_id is not None:
26+
Job.objects.filter(pk=job.pk).update(task_id=task_id)
27+
job.refresh_from_db()
28+
return job
29+
30+
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
31+
def test_dry_run(self, mock_cleanup):
32+
"""Dry run returns results without modifying jobs."""
33+
job = self._create_job(status=JobState.STARTED)
34+
35+
results = check_stale_jobs(dry_run=True)
36+
37+
self.assertEqual(len(results), 1)
38+
self.assertEqual(results[0]["action"], "revoked")
39+
job.refresh_from_db()
40+
self.assertEqual(job.status, JobState.STARTED.value)
41+
mock_cleanup.assert_not_called()
42+
43+
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
44+
def test_revokes_stale_job(self, mock_cleanup):
45+
"""Stale job without a known Celery state is revoked and cleaned up."""
46+
job = self._create_job(status=JobState.STARTED)
47+
48+
results = check_stale_jobs()
49+
50+
self.assertEqual(len(results), 1)
51+
result = results[0]
52+
self.assertEqual(result["action"], "revoked")
53+
self.assertEqual(result["previous_status"], JobState.STARTED)
54+
job.refresh_from_db()
55+
self.assertEqual(job.status, JobState.REVOKED.value)
56+
self.assertIsNotNone(job.finished_at)
57+
mock_cleanup.assert_called_once_with(job)
58+
59+
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
60+
@patch("celery.result.AsyncResult")
61+
def test_updates_status_from_known_celery_state(self, mock_async_result, mock_cleanup):
62+
"""Stale job with a terminal Celery state is updated (not revoked)."""
63+
from celery import states
64+
65+
mock_async_result.return_value.state = states.FAILURE
66+
job = self._create_job(status=JobState.STARTED, task_id="some-celery-task-id")
67+
68+
results = check_stale_jobs()
69+
70+
self.assertEqual(len(results), 1)
71+
result = results[0]
72+
self.assertEqual(result["action"], "updated")
73+
self.assertEqual(result["state"], states.FAILURE)
74+
job.refresh_from_db()
75+
self.assertEqual(job.status, JobState.FAILURE.value)
76+
self.assertIsNotNone(job.finished_at)
77+
mock_cleanup.assert_called_once_with(job)
78+
79+
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
80+
@patch("celery.result.AsyncResult")
81+
def test_revokes_success_with_incomplete_progress(self, mock_async_result, mock_cleanup):
82+
"""async_api job where Celery reports SUCCESS but progress is incomplete is revoked."""
83+
from celery import states
84+
85+
mock_async_result.return_value.state = states.SUCCESS
86+
job = self._create_job(status=JobState.STARTED, task_id="some-celery-task-id")
87+
Job.objects.filter(pk=job.pk).update(dispatch_mode=JobDispatchMode.ASYNC_API)
88+
job.refresh_from_db()
89+
# job.progress.is_complete() returns False by default (no stages completed)
90+
91+
results = check_stale_jobs()
92+
93+
self.assertEqual(len(results), 1)
94+
self.assertEqual(results[0]["action"], "revoked")
95+
job.refresh_from_db()
96+
self.assertEqual(job.status, JobState.REVOKED.value)
97+
mock_cleanup.assert_called_once_with(job)
98+
99+
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
100+
@patch("celery.result.AsyncResult")
101+
def test_revokes_when_celery_lookup_fails(self, mock_async_result, mock_cleanup):
102+
"""Job is revoked if Celery state lookup raises an exception."""
103+
mock_async_result.side_effect = ConnectionError("broker down")
104+
job = self._create_job(status=JobState.STARTED, task_id="unreachable-task")
105+
106+
results = check_stale_jobs()
107+
108+
self.assertEqual(len(results), 1)
109+
self.assertEqual(results[0]["action"], "revoked")
110+
job.refresh_from_db()
111+
self.assertEqual(job.status, JobState.REVOKED.value)
112+
mock_cleanup.assert_called_once_with(job)
113+
114+
@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
115+
def test_skips_recent_and_final_state_jobs(self, mock_cleanup):
116+
"""Recent jobs and jobs in final states are not touched."""
117+
self._create_job(status=JobState.STARTED, hours_ago=1) # recent
118+
self._create_job(status=JobState.SUCCESS, hours_ago=200) # final state
119+
120+
results = check_stale_jobs()
121+
122+
self.assertEqual(results, [])
123+
mock_cleanup.assert_not_called()

0 commit comments

Comments
 (0)