diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 214d672..52f6e3b 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -8,7 +8,7 @@ | `httpx` | `>=0, <1` | The next generation HTTP client. | [https://pypi.org/project/httpx/](https://pypi.org/project/httpx/) | | `pandas` | `>=2, <3` | Powerful data structures for data analysis, time series, and statistics | [https://pandas.pydata.org](https://pandas.pydata.org) | | `nest-asyncio2` | `>=1.6, <2.0` | Patch asyncio to allow nested event loops | [https://github.com/Chaoses-Ib/nest-asyncio2](https://github.com/Chaoses-Ib/nest-asyncio2) | -| `rich` | `>=13.6, <14.0` | Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal | [https://github.com/Textualize/rich](https://github.com/Textualize/rich) | +| `rich` | `>=13.6, <14.0` | Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal | [https://pypi.org/project/rich/](https://pypi.org/project/rich/) | | `pytket` | `>=2.3.1, <3.0` | Quantum computing toolkit and interface to the TKET compiler | [https://pypi.org/project/pytket/](https://pypi.org/project/pytket/) | | `websockets` | `>11, <16` | An implementation of the WebSocket Protocol (RFC 6455 & 7692) | [https://pypi.org/project/websockets/](https://pypi.org/project/websockets/) | | `pydantic-settings` | `>=2, <3.0` | Settings management using Pydantic | [https://pypi.org/project/pydantic-settings/](https://pypi.org/project/pydantic-settings/) | diff --git a/integration/test_jobs.py b/integration/test_jobs.py index ed2ff27..2d5334a 100644 --- a/integration/test_jobs.py +++ b/integration/test_jobs.py @@ -17,6 +17,7 @@ import qnexus as qnx import qnexus.exceptions as qnx_exc +from qnexus.client.jobs import HybridStrategy, PollingStrategy from qnexus.models.job_status import JobStatusEnum from qnexus.models.references import ( CircuitRef, @@ -708,3 +709,60 @@ def test_job_cost_confidence( cost_confidence = qnx.jobs.cost_confidence(execute_job_ref) assert isinstance(cost_confidence, list) assert len(cost_confidence) > 0 + + +def test_wait_for_with_polling_strategy( + create_execute_job_in_project: Callable[..., ContextManager[ExecuteJobRef]], + test_circuit: Circuit, +) -> None: + """Test that we can wait for a job using the polling strategy.""" + + with create_execute_job_in_project( + project_name=project_name, + job_name=execute_job_name, + circuit=test_circuit, + circuit_name=circuit_name, + ) as execute_job_ref: + assert isinstance(execute_job_ref, ExecuteJobRef) + + job_status = qnx.jobs.wait_for( + execute_job_ref, + strategy=PollingStrategy(), + timeout=120.0, + ) + + assert job_status.status == JobStatusEnum.COMPLETED + + execute_results = qnx.jobs.results(execute_job_ref) + assert len(execute_results) == 1 + assert isinstance(execute_results[0], ExecutionResultRef) + + +def test_wait_for_with_auto_strategy( + create_execute_job_in_project: Callable[..., ContextManager[ExecuteJobRef]], + test_circuit: Circuit, +) -> None: + """Test that we can wait for a job using the auto (hybrid) strategy, + which uses websocket initially then falls back to polling.""" + + with create_execute_job_in_project( + project_name=project_name, + job_name=execute_job_name, + circuit=test_circuit, + circuit_name=circuit_name, + ) as execute_job_ref: + assert isinstance(execute_job_ref, ExecuteJobRef) + + # Use a short websocket_timeout to test the fallback mechanism + # (though the job will likely complete before the timeout) + job_status = qnx.jobs.wait_for( + execute_job_ref, + strategy=HybridStrategy(websocket_timeout=5.0), + timeout=120.0, + ) + + assert job_status.status == JobStatusEnum.COMPLETED + + execute_results = qnx.jobs.results(execute_job_ref) + assert len(execute_results) == 1 + assert isinstance(execute_results[0], ExecutionResultRef) diff --git a/qnexus/client/hugr.py b/qnexus/client/hugr.py index e609874..090e062 100644 --- a/qnexus/client/hugr.py +++ b/qnexus/client/hugr.py @@ -4,8 +4,8 @@ uploaded to Nexus before stability is achieved might not work in the future. """ -import warnings import base64 +import warnings from datetime import datetime from typing import Any, Literal, Union, cast from uuid import UUID diff --git a/qnexus/client/jobs/__init__.py b/qnexus/client/jobs/__init__.py index 61be7d8..bd281ff 100644 --- a/qnexus/client/jobs/__init__.py +++ b/qnexus/client/jobs/__init__.py @@ -2,7 +2,9 @@ import asyncio import json +import logging import ssl +from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Any, Type, Union, cast, overload @@ -60,10 +62,11 @@ SystemRef, WasmModuleRef, ) - from qnexus.models.scope import ScopeFilterEnum from qnexus.models.utils import assert_never +logger = logging.getLogger(__name__) + EPOCH_START = datetime(1970, 1, 1, tzinfo=timezone.utc) @@ -80,6 +83,51 @@ class RemoteRetryStrategy(str, Enum): FULL_RESTART = "FULL_RESTART" +@dataclass +class WebsocketStrategy: + """Use a websocket connection for real-time updates. + + Best for short-running jobs (<10 minutes). + """ + + +@dataclass +class PollingStrategy: + """Use exponential backoff polling. + + More robust for long-running jobs (>10 minutes). + + Attributes: + initial_interval: Starting poll interval in seconds. + max_interval_queued: Maximum poll interval when job is queued. + max_interval_running: Maximum poll interval when job is running/submitted. + backoff_factor: Multiplier for interval after each poll. + """ + + initial_interval: float = 1.0 + max_interval_queued: float = 1200.0 + max_interval_running: float = 180.0 + backoff_factor: float = 2.0 + + +@dataclass +class HybridStrategy: + """Start with websocket, fall back to polling. + + Recommended for most use cases. + + Attributes: + websocket_timeout: How long to use websocket before switching to polling. + polling: Configuration for the polling fallback. + """ + + websocket_timeout: float = 600.0 + polling: PollingStrategy = field(default_factory=PollingStrategy) + + +WaitStrategy = WebsocketStrategy | PollingStrategy | HybridStrategy + + class Params( CreatorFilter, PropertiesFilter, @@ -338,19 +386,176 @@ def _fetch_by_id( ) -def wait_for( +async def poll_job_status( + job: JobRef, + wait_for_status: JobStatusEnum = JobStatusEnum.COMPLETED, + strategy: PollingStrategy = PollingStrategy(), +) -> JobStatus: + """Poll job status with exponential backoff and adaptive intervals. + + Uses different maximum poll intervals based on job state: + - QUEUED: Polls less frequently (default 20 min) since queue position changes slowly + - RUNNING/SUBMITTED: Polls more frequently (default 3 min) for responsiveness + + Args: + job: The job to monitor. + wait_for_status: The status to wait for. + strategy: Polling configuration. + + Returns: + The final JobStatus when the target status is reached or job terminates. + """ + interval = strategy.initial_interval + logger.debug( + "Starting polling for job %s (target: %s, interval: %.1fs, " + "max queued: %.1fs, max running: %.1fs)", + job.id, + wait_for_status.value, + strategy.initial_interval, + strategy.max_interval_queued, + strategy.max_interval_running, + ) + + while True: + job_status = status(job) + + # Adapt max interval based on job state + if job_status.status == JobStatusEnum.QUEUED: + max_interval = strategy.max_interval_queued + else: + max_interval = strategy.max_interval_running + + # Clamp interval to current max (allows faster polling when transitioning + # from QUEUED to RUNNING) + interval = min(interval, max_interval) + + logger.debug( + "Job %s status: %s (next poll in %.1fs, max: %.1fs)", + job.id, + job_status.status.value, + interval, + max_interval, + ) + + if ( + job_status.status not in WAITING_STATUS + or job_status.status == wait_for_status + ): + logger.debug("Job %s reached status: %s", job.id, job_status.status.value) + return job_status + + await asyncio.sleep(interval) + interval = min(interval * strategy.backoff_factor, max_interval) + + +async def hybrid_wait( job: JobRef, wait_for_status: JobStatusEnum = JobStatusEnum.COMPLETED, - timeout: float | None = 900.0, + strategy: HybridStrategy = HybridStrategy(), ) -> JobStatus: - """Check job status until the job is complete (or a specified status).""" - job_status = asyncio.run( - asyncio.wait_for( + """Use websocket for initial period, then fall back to polling. + + Args: + job: The job to monitor. + wait_for_status: The status to wait for. + strategy: Hybrid strategy configuration. + + Returns: + The final JobStatus when the target status is reached or job terminates. + """ + logger.debug( + "Using hybrid strategy for job %s (websocket timeout: %.1fs)", + job.id, + strategy.websocket_timeout, + ) + try: + # Try websocket first with a timeout + job_status = await asyncio.wait_for( listen_job_status(job=job, wait_for_status=wait_for_status), - timeout=timeout, + timeout=strategy.websocket_timeout, + ) + return job_status + except asyncio.TimeoutError: + # Websocket phase timed out, switch to polling + logger.debug( + "Job %s: websocket timeout after %.1fs, switching to polling", + job.id, + strategy.websocket_timeout, + ) + return await poll_job_status( + job=job, wait_for_status=wait_for_status, strategy=strategy.polling ) + + +def wait_for( + job: JobRef, + wait_for_status: JobStatusEnum = JobStatusEnum.COMPLETED, + timeout: float | None = None, + strategy: WaitStrategy = HybridStrategy(), +) -> JobStatus: + """Check job status until the job is complete (or a specified status). + + Args: + job: The job to monitor. + wait_for_status: The status to wait for (default: COMPLETED). + timeout: Overall timeout in seconds. None for no timeout (default: None). + strategy: How to monitor the job: + - WebsocketStrategy(): Real-time updates via websocket. + Best for short jobs (<10 minutes). + - PollingStrategy(): Exponential backoff polling. + Robust for long jobs (>10 minutes). + - HybridStrategy(): Websocket first, then polling fallback (default). + Recommended for most use cases. + + Returns: + The final JobStatus. + + Raises: + JobError: If the job errors, is cancelled, depleted, or terminated + (unless that was the status being waited for). + asyncio.TimeoutError: If the overall timeout is exceeded. + + Examples: + # Use defaults (hybrid strategy) + wait_for(job) + + # Custom polling configuration + wait_for(job, strategy=PollingStrategy(initial_interval=5.0, backoff_factor=1.5)) + + # Custom hybrid with polling fallback config + wait_for(job, strategy=HybridStrategy( + websocket_timeout=300.0, + polling=PollingStrategy(max_interval_running=60.0) + )) + """ + logger.debug( + "Waiting for job %s with strategy=%s, timeout=%s, target=%s", + job.id, + type(strategy).__name__, + timeout, + wait_for_status.value, ) + match strategy: + case WebsocketStrategy(): + coro = listen_job_status(job=job, wait_for_status=wait_for_status) + case PollingStrategy(): + coro = poll_job_status( + job=job, wait_for_status=wait_for_status, strategy=strategy + ) + case HybridStrategy(): + coro = hybrid_wait( + job=job, wait_for_status=wait_for_status, strategy=strategy + ) + case _: + assert_never(strategy) + + if timeout is not None: + coro = asyncio.wait_for(coro, timeout=timeout) + + job_status = asyncio.run(coro) + logger.info("Job %s finished with status: %s", job.id, job_status.status.value) + if ( job_status.status == JobStatusEnum.ERROR and wait_for_status != JobStatusEnum.ERROR @@ -387,7 +592,6 @@ def status(job: JobRef, scope: ScopeFilterEnum = ScopeFilterEnum.USER) -> JobSta message=resp.text, status_code=resp.status_code ) job_status = JobStatus.from_dict(resp.json()) - # job.last_status = job_status.status return job_status @@ -397,7 +601,7 @@ async def listen_job_status( """Check the Status of a Job via a websocket connection. Will use SSO tokens.""" job_status = status(job) - # logger.debug("Current job status: %s", job_status.status) + logger.debug("Job %s initial status: %s", job.id, job_status.status.value) if job_status.status not in WAITING_STATUS or job_status.status == wait_for_status: return job_status @@ -418,17 +622,20 @@ def _process_exception(exc: Exception) -> Exception | None: # TODO, this cookie will expire frequently "Cookie": f"myqos_id={get_nexus_client().auth.cookies.get('myqos_id')}" # type: ignore } + logger.debug("Job %s: opening websocket connection", job.id) async for websocket in connect( f"{CONFIG.websockets_url}/api/jobs/v1beta3/{job.id}/attributes/status/ws", ssl=ssl_context, additional_headers=additional_headers, process_exception=_process_exception, - # logger=logger, + logger=logger, ): try: async for status_json in websocket: - # logger.debug("New status: %s", status_json) job_status = JobStatus.from_dict(json.loads(status_json)) + logger.debug( + "Job %s websocket update: %s", job.id, job_status.status.value + ) if ( job_status.status not in WAITING_STATUS @@ -437,9 +644,9 @@ def _process_exception(exc: Exception) -> Exception | None: break break except ConnectionClosed: - # logger.debug( - # "Websocket connection closed... attempting to reconnect..." - # ) + logger.debug( + "Job %s: websocket connection closed, attempting to reconnect", job.id + ) continue finally: try: diff --git a/qnexus/client/qir.py b/qnexus/client/qir.py index d23278d..ce74cbb 100644 --- a/qnexus/client/qir.py +++ b/qnexus/client/qir.py @@ -1,7 +1,7 @@ """Client API for QIR in Nexus.""" -import warnings import base64 +import warnings from datetime import datetime from typing import Any, Literal, Union, cast from uuid import UUID