Skip to content

Commit 05ad870

Browse files
carlosgjscarlos-irreverentlabsmihowclaudeCopilot
authored
Support ML async job cancellation, fail jobs on redis errors (#1162)
* merge * fix: PSv2 follow-up fixes from integration tests (#1135) * fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> * PSv2: Improve task fetching & web worker concurrency configuration (#1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after #1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com> * fix: include pipeline_slug in MinimalJobSerializer (#1148) * fix: include pipeline_slug in MinimalJobSerializer (ids_only response) The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in the response to know which pipeline to run. Without it, Pydantic validation fails and the worker skips the job. Co-Authored-By: Claude <noreply@anthropic.com> * Update ami/jobs/serializers.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Avoid redis based locking by using atomic updates * Test concurrency * Increase max ack pending * update comment * CR feedback * Cancel jobs if Redis state is missing * Add chaos monkey * CR feedback * CR 2 * fix: OrderedEnum comparisons now override str MRO in subclasses JobState(str, OrderedEnum) was using str's lexicographic __gt__ instead of OrderedEnum's definition-order __gt__, because str comes first in the MRO. This caused max(FAILURE, SUCCESS) to return SUCCESS, silently discarding failure state in concurrent job progress updates. Fix: __init_subclass__ injects comparison methods directly onto each subclass so they take MRO priority over data-type mixins. Also preserve FAILURE status through the progress ternary when progress < 1.0, so early failure detection isn't overwritten. Co-Authored-By: Claude <noreply@anthropic.com> * fix: correct misleading error log about NATS redelivery The NATS message is ACK'd at line 145, before update_state() and _update_job_progress(). If either of those raises, the except block was logging "NATS will redeliver" when it won't. Co-Authored-By: Claude <noreply@anthropic.com> * Use job.logger * Use job.logger * Integrate cancellation support * merge, update tests * Remove pause support in monkey * fix: cancel async jobs by cleaning up NATS/Redis and stopping task delivery For async_api jobs, the Celery task completes after queuing images to NATS, so task.revoke() has no effect. The worker kept pulling tasks via the /tasks endpoint because it only checked final_states(), not CANCELING. - Add JobState.active_states() (STARTED, RETRY) for positive task-serving check - /tasks endpoint returns empty unless job is in active_states() - Job.cancel() for async_api jobs: clean up NATS/Redis, then set REVOKED Co-Authored-By: Claude <noreply@anthropic.com> * fix(ui): hide Retry button while job is in CANCELING state canRetry now excludes CANCELING so the Retry button stays hidden during the drain period, matching the backend's transitional state. Co-Authored-By: Claude <noreply@anthropic.com> * fix: downgrade Redis-missing log to warning for canceled jobs When a job is canceled, NATS/Redis cleanup runs before in-flight results finish processing. The resulting "Redis state missing" message is expected, not an error. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add async job monitoring reference Covers all monitoring points for NATS async jobs: Django ORM, REST API, tasks endpoint, NATS consumer state, Redis counters, Docker logs, and AMI worker logs. Linked from CLAUDE.md and the test_ml_job_e2e command. Co-Authored-By: Claude <noreply@anthropic.com> * fix: update tests for active_states() guard on /tasks endpoint Tests need to set job status to STARTED since the /tasks endpoint now only serves tasks for jobs in active_states() (STARTED, RETRY). Co-Authored-By: Claude <noreply@anthropic.com> * fix: improve job cancel ordering, fail status sync, and log handler safety - Reorder cancel(): revoke Celery task before cleaning up async resources to prevent a theoretical race where a worker recreates state after cleanup - Remove redundant self.save() after task.revoke() (no fields changed) - Use update_status() in _fail_job() to keep progress.summary.status in sync with job.status - Wrap entire log handler emit() DB sequence (refresh_from_db + mutations + save) in try/except so a DB failure during logging cannot crash callers Co-Authored-By: Claude <noreply@anthropic.com> * fix: restore timeout on _stream_exists and use settings for NATS_URL - Add asyncio.wait_for() wrapper to _stream_exists() stream_info call, accidentally dropped during refactor from _ensure_stream - Read NATS_URL from Django settings in chaos_monkey command instead of hardcoding, consistent with TaskQueueManager Co-Authored-By: Claude <noreply@anthropic.com> * fix(ui): block retry button while job is in RETRY state RETRY is an active processing state; allowing another retry while one is already running could cause duplicate execution. Co-Authored-By: Claude <noreply@anthropic.com> * docs: clarify _stream_exists timeout propagation design Add docstring explaining that TimeoutError is deliberately not caught — an unreachable NATS server should be a hard failure, not a "stream missing" false negative. Multiple reviewers questioned this behavior. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add language tag to fenced code block in monitoring guide Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 12a3c70 commit 05ad870

12 files changed

Lines changed: 430 additions & 54 deletions

File tree

.agents/AGENTS.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,16 @@ images = SourceImage.objects.annotate(det_count=Count('detections'))
650650
- Use `@shared_task` decorator for all tasks
651651
- Check Flower UI for debugging: http://localhost:5555
652652

653+
### E2E Testing & Monitoring Async Jobs
654+
655+
Run an end-to-end ML job test:
656+
```bash
657+
docker compose run --rm django python manage.py test_ml_job_e2e \
658+
--project 18 --dispatch-mode async_api --collection 142 --pipeline "global_moths_2024"
659+
```
660+
661+
For monitoring running jobs (Django ORM, REST API, NATS consumer state, Redis counters, worker logs, etc.), see `docs/claude/reference/monitoring-async-jobs.md`.
662+
653663
### Running a Single Test
654664

655665
```bash
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""
2+
Fault injection utility for manual chaos testing of ML async jobs.
3+
4+
Use alongside `test_ml_job_e2e` to verify job behaviour when Redis or NATS
5+
becomes unavailable or loses state mid-processing.
6+
7+
Usage examples:
8+
9+
# Flush all Redis state immediately (simulates FLUSHDB mid-job)
10+
python manage.py chaos_monkey flush redis
11+
12+
# Flush all NATS JetStream streams (simulates broker state loss)
13+
python manage.py chaos_monkey flush nats
14+
"""
15+
16+
from asgiref.sync import async_to_sync
17+
from django.conf import settings
18+
from django.core.management.base import BaseCommand, CommandError
19+
from django_redis import get_redis_connection
20+
21+
NATS_URL = getattr(settings, "NATS_URL", "nats://nats:4222")
22+
23+
24+
class Command(BaseCommand):
25+
help = "Inject faults into Redis or NATS for chaos/resilience testing"
26+
27+
def add_arguments(self, parser):
28+
parser.add_argument(
29+
"action",
30+
choices=["flush"],
31+
help="flush: wipe all state.",
32+
)
33+
parser.add_argument(
34+
"service",
35+
choices=["redis", "nats"],
36+
help="Target service to fault.",
37+
)
38+
39+
def handle(self, *args, **options):
40+
action = options["action"]
41+
service = options["service"]
42+
43+
if action == "flush" and service == "redis":
44+
self._flush_redis()
45+
elif action == "flush" and service == "nats":
46+
self._flush_nats()
47+
48+
# ------------------------------------------------------------------
49+
# Redis
50+
# ------------------------------------------------------------------
51+
52+
def _flush_redis(self):
53+
self.stdout.write("Flushing Redis database (FLUSHDB)...")
54+
try:
55+
redis = get_redis_connection("default")
56+
redis.flushdb()
57+
self.stdout.write(self.style.SUCCESS("Redis flushed."))
58+
except Exception as e:
59+
raise CommandError(f"Failed to flush Redis: {e}") from e
60+
61+
# ------------------------------------------------------------------
62+
# NATS
63+
# ------------------------------------------------------------------
64+
65+
def _flush_nats(self):
66+
"""Delete all JetStream streams via the NATS Python client."""
67+
self.stdout.write("Flushing all NATS JetStream streams...")
68+
69+
async def _delete_all_streams():
70+
import nats
71+
72+
nc = await nats.connect(NATS_URL, connect_timeout=5, allow_reconnect=False)
73+
js = nc.jetstream()
74+
try:
75+
streams = await js.streams_info()
76+
if not streams:
77+
return []
78+
deleted = []
79+
for stream in streams:
80+
name = stream.config.name
81+
await js.delete_stream(name)
82+
deleted.append(name)
83+
return deleted
84+
finally:
85+
await nc.close()
86+
87+
try:
88+
deleted = async_to_sync(_delete_all_streams)()
89+
except Exception as e:
90+
raise CommandError(f"Failed to flush NATS: {e}") from e
91+
92+
if deleted:
93+
for name in deleted:
94+
self.stdout.write(f" Deleted stream: {name}")
95+
self.stdout.write(self.style.SUCCESS(f"Deleted {len(deleted)} stream(s)."))
96+
else:
97+
self.stdout.write("No streams found — NATS already empty.")

ami/jobs/management/commands/test_ml_job_e2e.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010

1111

1212
class Command(BaseCommand):
13-
help = "Run end-to-end test of ML job processing"
13+
help = (
14+
"Run end-to-end test of ML job processing.\n\n"
15+
"For monitoring and debugging running jobs, see:\n"
16+
" docs/claude/reference/monitoring-async-jobs.md"
17+
)
1418

1519
def add_arguments(self, parser):
1620
parser.add_argument("--project", type=int, required=True, help="Project ID")

ami/jobs/models.py

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from ami.base.models import BaseModel
1717
from ami.base.schemas import ConfigurableStage, ConfigurableStageParam
18-
from ami.jobs.tasks import run_job
18+
from ami.jobs.tasks import cleanup_async_job_if_needed, run_job
1919
from ami.main.models import Deployment, Project, SourceImage, SourceImageCollection
2020
from ami.ml.models import Pipeline
2121
from ami.ml.post_processing.registry import get_postprocessing_task
@@ -88,6 +88,11 @@ def final_states(cls):
8888
def failed_states(cls):
8989
return [cls.FAILURE, cls.REVOKED, cls.UNKNOWN]
9090

91+
@classmethod
92+
def active_states(cls):
93+
"""States where a job is actively processing and should serve tasks to workers."""
94+
return [cls.STARTED, cls.RETRY]
95+
9196

9297
def get_status_label(status: JobState, progress: float) -> str:
9398
"""
@@ -331,26 +336,29 @@ def emit(self, record: logging.LogRecord):
331336
# Log to the current app logger
332337
logger.log(record.levelno, self.format(record))
333338

334-
# Write to the logs field on the job instance
335-
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
336-
msg = f"[{timestamp}] {record.levelname} {self.format(record)}"
337-
if msg not in self.job.logs.stdout:
338-
self.job.logs.stdout.insert(0, msg)
339+
# Write to the logs field on the job instance.
340+
# Refresh from DB first to reduce the window for concurrent overwrites — each
341+
# worker holds its own stale in-memory copy of `logs`, so without a refresh the
342+
# last writer always wins and earlier entries are silently dropped.
343+
# @TODO consider saving logs to the database periodically rather than on every log
344+
try:
345+
self.job.refresh_from_db(fields=["logs"])
346+
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
347+
msg = f"[{timestamp}] {record.levelname} {self.format(record)}"
348+
if msg not in self.job.logs.stdout:
349+
self.job.logs.stdout.insert(0, msg)
339350

340-
# Write a simpler copy of any errors to the errors field
341-
if record.levelno >= logging.ERROR:
342-
if record.message not in self.job.logs.stderr:
343-
self.job.logs.stderr.insert(0, record.message)
351+
# Write a simpler copy of any errors to the errors field
352+
if record.levelno >= logging.ERROR:
353+
if record.message not in self.job.logs.stderr:
354+
self.job.logs.stderr.insert(0, record.message)
344355

345-
if len(self.job.logs.stdout) > self.max_log_length:
346-
self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length]
356+
if len(self.job.logs.stdout) > self.max_log_length:
357+
self.job.logs.stdout = self.job.logs.stdout[: self.max_log_length]
347358

348-
# @TODO consider saving logs to the database periodically rather than on every log
349-
try:
350359
self.job.save(update_fields=["logs"], update_progress=False)
351360
except Exception as e:
352361
logger.error(f"Failed to save logs for job #{self.job.pk}: {e}")
353-
pass
354362

355363

356364
@dataclass
@@ -966,19 +974,28 @@ def retry(self, async_task=True):
966974

967975
def cancel(self):
968976
"""
969-
Terminate the celery task.
977+
Cancel a job. For async_api jobs, clean up NATS/Redis resources
978+
and transition through CANCELING → REVOKED. For other jobs,
979+
revoke the Celery task.
970980
"""
971981
self.status = JobState.CANCELING
972982
self.save()
983+
973984
if self.task_id:
974985
task = run_job.AsyncResult(self.task_id)
975986
if task:
976987
task.revoke(terminate=True)
988+
if self.dispatch_mode == JobDispatchMode.ASYNC_API:
989+
# For async jobs we need to set the status to revoked here since the task already
990+
# finished (it only queues the images).
991+
self.status = JobState.REVOKED
977992
self.save()
978993
else:
979994
self.status = JobState.REVOKED
980995
self.save()
981996

997+
cleanup_async_job_if_needed(self)
998+
982999
def update_status(self, status=None, save=True):
9831000
"""
9841001
Update the status of the job based on the status of the celery task.
@@ -1084,11 +1101,15 @@ def get_default_progress(cls) -> JobProgress:
10841101
def logger(self) -> logging.Logger:
10851102
_logger = logging.getLogger(f"ami.jobs.{self.pk}")
10861103

1087-
# Only add JobLogHandler if not already present
1088-
if not any(isinstance(h, JobLogHandler) for h in _logger.handlers):
1089-
# Also log output to a field on thie model instance
1104+
# Update or add JobLogHandler, always pointing to the current instance.
1105+
# The logger is a process-level singleton so its handler may reference a stale
1106+
# job instance from a previous task execution in this worker process.
1107+
handler = next((h for h in _logger.handlers if isinstance(h, JobLogHandler)), None)
1108+
if handler is None:
10901109
logger.info("Adding JobLogHandler to logger for job %s", self.pk)
10911110
_logger.addHandler(JobLogHandler(self))
1111+
else:
1112+
handler.job = self
10921113
_logger.propagate = False
10931114
return _logger
10941115

ami/jobs/tasks.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,9 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
8686

8787
progress_info = state_manager.update_state(processed_image_ids, stage="process", failed_image_ids=failed_image_ids)
8888
if not progress_info:
89-
logger.error(f"Redis state missing for job {job_id} — job may have been cleaned up prematurely.")
9089
# Acknowledge the task to prevent retries, since we don't know the state
9190
_ack_task_via_nats(reply_subject, logger)
92-
# TODO: cancel the job to fail fast once PR #1144 is merged
91+
_fail_job(job_id, "Redis state missing for job")
9392
return
9493

9594
try:
@@ -153,8 +152,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
153152
)
154153

155154
if not progress_info:
156-
job.logger.error(f"Redis state missing for job {job_id} — job may have been cleaned up prematurely.")
157-
# TODO: cancel the job to fail fast once PR #1144 is merged
155+
_fail_job(job_id, "Redis state missing for job")
158156
return
159157

160158
# update complete state based on latest progress info after saving results
@@ -180,6 +178,26 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
180178
job.logger.error(error)
181179

182180

181+
def _fail_job(job_id: int, reason: str) -> None:
182+
from ami.jobs.models import Job, JobState
183+
from ami.ml.orchestration.jobs import cleanup_async_job_resources
184+
185+
try:
186+
with transaction.atomic():
187+
job = Job.objects.select_for_update().get(pk=job_id)
188+
if job.status in (JobState.CANCELING, *JobState.final_states()):
189+
return
190+
job.update_status(JobState.FAILURE, save=False)
191+
job.finished_at = datetime.datetime.now()
192+
job.save(update_fields=["status", "progress", "finished_at"])
193+
194+
job.logger.error(f"Job {job_id} marked as FAILURE: {reason}")
195+
cleanup_async_job_resources(job.pk, job.logger)
196+
except Job.DoesNotExist:
197+
logger.error(f"Cannot fail job {job_id}: not found")
198+
cleanup_async_job_resources(job_id, logger)
199+
200+
183201
def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> None:
184202
try:
185203

@@ -295,10 +313,10 @@ def _update_job_progress(
295313
# Clean up async resources for completed jobs that use NATS/Redis
296314
if job.progress.is_complete():
297315
job = Job.objects.get(pk=job_id) # Re-fetch outside transaction
298-
_cleanup_job_if_needed(job)
316+
cleanup_async_job_if_needed(job)
299317

300318

301-
def _cleanup_job_if_needed(job) -> None:
319+
def cleanup_async_job_if_needed(job) -> None:
302320
"""
303321
Clean up async resources (NATS/Redis) if this job uses them.
304322
@@ -314,7 +332,7 @@ def _cleanup_job_if_needed(job) -> None:
314332
# import here to avoid circular imports
315333
from ami.ml.orchestration.jobs import cleanup_async_job_resources
316334

317-
cleanup_async_job_resources(job)
335+
cleanup_async_job_resources(job.pk, job.logger)
318336

319337

320338
@task_prerun.connect(sender=run_job)
@@ -353,7 +371,7 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs):
353371

354372
# Clean up async resources for revoked jobs
355373
if state == JobState.REVOKED:
356-
_cleanup_job_if_needed(job)
374+
cleanup_async_job_if_needed(job)
357375

358376

359377
@task_failure.connect(sender=run_job, retry=False)
@@ -368,7 +386,7 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs):
368386
job.save()
369387

370388
# Clean up async resources for failed jobs
371-
_cleanup_job_if_needed(job)
389+
cleanup_async_job_if_needed(job)
372390

373391

374392
def log_time(start: float = 0, msg: str | None = None) -> tuple[float, Callable]:

ami/jobs/tests.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,8 @@ def _task_batch_helper(self, value: Any, expected_status: int):
445445
pipeline = self._create_pipeline()
446446
job = self._create_ml_job("Job for batch test", pipeline)
447447
job.dispatch_mode = JobDispatchMode.ASYNC_API
448-
job.save(update_fields=["dispatch_mode"])
448+
job.status = JobState.STARTED
449+
job.save(update_fields=["dispatch_mode", "status"])
449450
images = [
450451
SourceImage.objects.create(
451452
path=f"image_{i}.jpg",
@@ -487,6 +488,7 @@ def test_tasks_endpoint_without_pipeline(self):
487488
name="Job without pipeline",
488489
source_image_collection=self.source_image_collection,
489490
dispatch_mode=JobDispatchMode.ASYNC_API,
491+
status=JobState.STARTED,
490492
)
491493

492494
self.client.force_authenticate(user=self.user)

ami/jobs/views.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ def tasks(self, request, pk=None):
237237
if job.dispatch_mode != JobDispatchMode.ASYNC_API:
238238
raise ValidationError("Only async_api jobs have fetchable tasks")
239239

240-
# Don't fetch tasks from completed/failed/revoked jobs
241-
if job.status in JobState.final_states():
240+
# Only serve tasks for actively processing jobs
241+
if job.status not in JobState.active_states():
242242
return Response({"tasks": []})
243243

244244
# Validate that the job has a pipeline

0 commit comments

Comments
 (0)