-
Notifications
You must be signed in to change notification settings - Fork 4
feat: improved job wait_for strategies #318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vanyae-cqc
wants to merge
6
commits into
main
Choose a base branch
from
feat/long-running-job-monitoring
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
11605d9
feat: improved job wait_for strategies
vanyae-cqc 19f00d0
chore: write dependencies change
vanyae-cqc 2503e74
chore: switch test to execute job
vanyae-cqc 30e47a2
feat: remove wait_for timeout, increase maximum polling interval and …
vanyae-cqc 07f4545
chore: revert null logging handler
vanyae-cqc 341a101
chore: refactor into dataclasses
vanyae-cqc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,9 @@ | |
|
|
||
| import asyncio | ||
| import json | ||
| import logging | ||
| import ssl | ||
| from dataclasses import dataclass, field | ||
| from datetime import datetime, timezone | ||
| from enum import Enum | ||
| from typing import Any, Type, Union, cast, overload | ||
|
|
@@ -60,10 +62,11 @@ | |
| SystemRef, | ||
| WasmModuleRef, | ||
| ) | ||
|
|
||
| from qnexus.models.scope import ScopeFilterEnum | ||
| from qnexus.models.utils import assert_never | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| EPOCH_START = datetime(1970, 1, 1, tzinfo=timezone.utc) | ||
|
|
||
|
|
||
|
|
@@ -80,6 +83,51 @@ class RemoteRetryStrategy(str, Enum): | |
| FULL_RESTART = "FULL_RESTART" | ||
|
|
||
|
|
||
| @dataclass | ||
| class WebsocketStrategy: | ||
| """Use a websocket connection for real-time updates. | ||
|
|
||
| Best for short-running jobs (<10 minutes). | ||
| """ | ||
|
|
||
|
|
||
| @dataclass | ||
| class PollingStrategy: | ||
| """Use exponential backoff polling. | ||
|
|
||
| More robust for long-running jobs (>10 minutes). | ||
|
|
||
| Attributes: | ||
| initial_interval: Starting poll interval in seconds. | ||
| max_interval_queued: Maximum poll interval when job is queued. | ||
| max_interval_running: Maximum poll interval when job is running/submitted. | ||
| backoff_factor: Multiplier for interval after each poll. | ||
| """ | ||
|
|
||
| initial_interval: float = 1.0 | ||
| max_interval_queued: float = 1200.0 | ||
| max_interval_running: float = 180.0 | ||
| backoff_factor: float = 2.0 | ||
|
|
||
|
|
||
| @dataclass | ||
| class HybridStrategy: | ||
| """Start with websocket, fall back to polling. | ||
|
|
||
| Recommended for most use cases. | ||
|
|
||
| Attributes: | ||
| websocket_timeout: How long to use websocket before switching to polling. | ||
| polling: Configuration for the polling fallback. | ||
| """ | ||
|
|
||
| websocket_timeout: float = 600.0 | ||
| polling: PollingStrategy = field(default_factory=PollingStrategy) | ||
|
|
||
|
|
||
| WaitStrategy = WebsocketStrategy | PollingStrategy | HybridStrategy | ||
|
|
||
|
|
||
| class Params( | ||
| CreatorFilter, | ||
| PropertiesFilter, | ||
|
|
@@ -338,19 +386,176 @@ def _fetch_by_id( | |
| ) | ||
|
|
||
|
|
||
| def wait_for( | ||
| async def poll_job_status( | ||
| job: JobRef, | ||
| wait_for_status: JobStatusEnum = JobStatusEnum.COMPLETED, | ||
| strategy: PollingStrategy = PollingStrategy(), | ||
| ) -> JobStatus: | ||
| """Poll job status with exponential backoff and adaptive intervals. | ||
|
|
||
| Uses different maximum poll intervals based on job state: | ||
| - QUEUED: Polls less frequently (default 20 min) since queue position changes slowly | ||
| - RUNNING/SUBMITTED: Polls more frequently (default 3 min) for responsiveness | ||
|
|
||
| Args: | ||
| job: The job to monitor. | ||
| wait_for_status: The status to wait for. | ||
| strategy: Polling configuration. | ||
|
|
||
| Returns: | ||
| The final JobStatus when the target status is reached or job terminates. | ||
| """ | ||
| interval = strategy.initial_interval | ||
| logger.debug( | ||
| "Starting polling for job %s (target: %s, interval: %.1fs, " | ||
| "max queued: %.1fs, max running: %.1fs)", | ||
| job.id, | ||
| wait_for_status.value, | ||
| strategy.initial_interval, | ||
| strategy.max_interval_queued, | ||
| strategy.max_interval_running, | ||
| ) | ||
|
|
||
| while True: | ||
| job_status = status(job) | ||
|
|
||
| # Adapt max interval based on job state | ||
| if job_status.status == JobStatusEnum.QUEUED: | ||
| max_interval = strategy.max_interval_queued | ||
| else: | ||
| max_interval = strategy.max_interval_running | ||
|
|
||
| # Clamp interval to current max (allows faster polling when transitioning | ||
| # from QUEUED to RUNNING) | ||
| interval = min(interval, max_interval) | ||
|
|
||
| logger.debug( | ||
| "Job %s status: %s (next poll in %.1fs, max: %.1fs)", | ||
| job.id, | ||
| job_status.status.value, | ||
| interval, | ||
| max_interval, | ||
| ) | ||
|
|
||
| if ( | ||
| job_status.status not in WAITING_STATUS | ||
| or job_status.status == wait_for_status | ||
| ): | ||
| logger.debug("Job %s reached status: %s", job.id, job_status.status.value) | ||
| return job_status | ||
|
|
||
| await asyncio.sleep(interval) | ||
| interval = min(interval * strategy.backoff_factor, max_interval) | ||
|
|
||
|
|
||
| async def hybrid_wait( | ||
| job: JobRef, | ||
| wait_for_status: JobStatusEnum = JobStatusEnum.COMPLETED, | ||
| timeout: float | None = 900.0, | ||
| strategy: HybridStrategy = HybridStrategy(), | ||
| ) -> JobStatus: | ||
| """Check job status until the job is complete (or a specified status).""" | ||
| job_status = asyncio.run( | ||
| asyncio.wait_for( | ||
| """Use websocket for initial period, then fall back to polling. | ||
|
|
||
| Args: | ||
| job: The job to monitor. | ||
| wait_for_status: The status to wait for. | ||
| strategy: Hybrid strategy configuration. | ||
|
|
||
| Returns: | ||
| The final JobStatus when the target status is reached or job terminates. | ||
| """ | ||
| logger.debug( | ||
| "Using hybrid strategy for job %s (websocket timeout: %.1fs)", | ||
| job.id, | ||
| strategy.websocket_timeout, | ||
| ) | ||
| try: | ||
| # Try websocket first with a timeout | ||
| job_status = await asyncio.wait_for( | ||
| listen_job_status(job=job, wait_for_status=wait_for_status), | ||
| timeout=timeout, | ||
| timeout=strategy.websocket_timeout, | ||
| ) | ||
| return job_status | ||
| except asyncio.TimeoutError: | ||
| # Websocket phase timed out, switch to polling | ||
| logger.debug( | ||
| "Job %s: websocket timeout after %.1fs, switching to polling", | ||
| job.id, | ||
| strategy.websocket_timeout, | ||
| ) | ||
| return await poll_job_status( | ||
| job=job, wait_for_status=wait_for_status, strategy=strategy.polling | ||
| ) | ||
|
|
||
|
|
||
| def wait_for( | ||
| job: JobRef, | ||
| wait_for_status: JobStatusEnum = JobStatusEnum.COMPLETED, | ||
| timeout: float | None = None, | ||
| strategy: WaitStrategy = HybridStrategy(), | ||
| ) -> JobStatus: | ||
| """Check job status until the job is complete (or a specified status). | ||
|
|
||
| Args: | ||
| job: The job to monitor. | ||
| wait_for_status: The status to wait for (default: COMPLETED). | ||
| timeout: Overall timeout in seconds. None for no timeout (default: None). | ||
| strategy: How to monitor the job: | ||
| - WebsocketStrategy(): Real-time updates via websocket. | ||
| Best for short jobs (<10 minutes). | ||
| - PollingStrategy(): Exponential backoff polling. | ||
| Robust for long jobs (>10 minutes). | ||
| - HybridStrategy(): Websocket first, then polling fallback (default). | ||
| Recommended for most use cases. | ||
|
|
||
| Returns: | ||
| The final JobStatus. | ||
|
|
||
| Raises: | ||
| JobError: If the job errors, is cancelled, depleted, or terminated | ||
| (unless that was the status being waited for). | ||
| asyncio.TimeoutError: If the overall timeout is exceeded. | ||
|
|
||
| Examples: | ||
| # Use defaults (hybrid strategy) | ||
| wait_for(job) | ||
|
|
||
| # Custom polling configuration | ||
| wait_for(job, strategy=PollingStrategy(initial_interval=5.0, backoff_factor=1.5)) | ||
|
|
||
| # Custom hybrid with polling fallback config | ||
| wait_for(job, strategy=HybridStrategy( | ||
| websocket_timeout=300.0, | ||
| polling=PollingStrategy(max_interval_running=60.0) | ||
| )) | ||
| """ | ||
| logger.debug( | ||
| "Waiting for job %s with strategy=%s, timeout=%s, target=%s", | ||
| job.id, | ||
| type(strategy).__name__, | ||
| timeout, | ||
| wait_for_status.value, | ||
| ) | ||
|
|
||
| match strategy: | ||
| case WebsocketStrategy(): | ||
| coro = listen_job_status(job=job, wait_for_status=wait_for_status) | ||
| case PollingStrategy(): | ||
| coro = poll_job_status( | ||
| job=job, wait_for_status=wait_for_status, strategy=strategy | ||
| ) | ||
| case HybridStrategy(): | ||
| coro = hybrid_wait( | ||
| job=job, wait_for_status=wait_for_status, strategy=strategy | ||
| ) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of an
|
||
| case _: | ||
| assert_never(strategy) | ||
|
|
||
| if timeout is not None: | ||
| coro = asyncio.wait_for(coro, timeout=timeout) | ||
|
|
||
| job_status = asyncio.run(coro) | ||
| logger.info("Job %s finished with status: %s", job.id, job_status.status.value) | ||
|
|
||
| if ( | ||
| job_status.status == JobStatusEnum.ERROR | ||
| and wait_for_status != JobStatusEnum.ERROR | ||
|
|
@@ -387,7 +592,6 @@ def status(job: JobRef, scope: ScopeFilterEnum = ScopeFilterEnum.USER) -> JobSta | |
| message=resp.text, status_code=resp.status_code | ||
| ) | ||
| job_status = JobStatus.from_dict(resp.json()) | ||
| # job.last_status = job_status.status | ||
| return job_status | ||
|
|
||
|
|
||
|
|
@@ -397,7 +601,7 @@ async def listen_job_status( | |
| """Check the Status of a Job via a websocket connection. | ||
| Will use SSO tokens.""" | ||
| job_status = status(job) | ||
| # logger.debug("Current job status: %s", job_status.status) | ||
| logger.debug("Job %s initial status: %s", job.id, job_status.status.value) | ||
| if job_status.status not in WAITING_STATUS or job_status.status == wait_for_status: | ||
| return job_status | ||
|
|
||
|
|
@@ -418,17 +622,20 @@ def _process_exception(exc: Exception) -> Exception | None: | |
| # TODO, this cookie will expire frequently | ||
| "Cookie": f"myqos_id={get_nexus_client().auth.cookies.get('myqos_id')}" # type: ignore | ||
| } | ||
| logger.debug("Job %s: opening websocket connection", job.id) | ||
| async for websocket in connect( | ||
| f"{CONFIG.websockets_url}/api/jobs/v1beta3/{job.id}/attributes/status/ws", | ||
| ssl=ssl_context, | ||
| additional_headers=additional_headers, | ||
| process_exception=_process_exception, | ||
| # logger=logger, | ||
| logger=logger, | ||
| ): | ||
| try: | ||
| async for status_json in websocket: | ||
| # logger.debug("New status: %s", status_json) | ||
| job_status = JobStatus.from_dict(json.loads(status_json)) | ||
| logger.debug( | ||
| "Job %s websocket update: %s", job.id, job_status.status.value | ||
| ) | ||
|
|
||
| if ( | ||
| job_status.status not in WAITING_STATUS | ||
|
|
@@ -437,9 +644,9 @@ def _process_exception(exc: Exception) -> Exception | None: | |
| break | ||
| break | ||
| except ConnectionClosed: | ||
| # logger.debug( | ||
| # "Websocket connection closed... attempting to reconnect..." | ||
| # ) | ||
| logger.debug( | ||
| "Job %s: websocket connection closed, attempting to reconnect", job.id | ||
| ) | ||
| continue | ||
| finally: | ||
| try: | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea