Skip to content

Commit 61b278c

Browse files
committed
feat: add queue to async client
1 parent 381c146 commit 61b278c

File tree

2 files changed

+154
-3
lines changed

2 files changed

+154
-3
lines changed

scrapegraph-py/scrapegraph_py/async_client.py

+153-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import asyncio
2-
from typing import Any, Optional
2+
from typing import Any, Optional, Dict, Callable, Awaitable, TypeVar, Generic
3+
from enum import Enum
4+
from dataclasses import dataclass
5+
from datetime import datetime
6+
from uuid import uuid4
37

48
from aiohttp import ClientSession, ClientTimeout, TCPConnector
59
from aiohttp.client_exceptions import ClientError
@@ -20,6 +24,26 @@
2024
)
2125
from scrapegraph_py.utils.helpers import handle_async_response, validate_api_key
2226

27+
T = TypeVar('T')
28+
29+
class JobStatus(Enum):
30+
PENDING = "pending"
31+
RUNNING = "running"
32+
COMPLETED = "completed"
33+
FAILED = "failed"
34+
35+
@dataclass
36+
class Job(Generic[T]):
37+
id: str
38+
status: JobStatus
39+
created_at: datetime
40+
started_at: Optional[datetime] = None
41+
completed_at: Optional[datetime] = None
42+
result: Optional[T] = None
43+
error: Optional[Exception] = None
44+
task: Optional[Callable[..., Awaitable[T]]] = None
45+
args: tuple = ()
46+
kwargs: dict = None
2347

2448
class AsyncClient:
2549
@classmethod
@@ -58,6 +82,7 @@ def __init__(
5882
timeout: Optional[float] = None,
5983
max_retries: int = 3,
6084
retry_delay: float = 1.0,
85+
max_queue_size: int = 1000,
6186
):
6287
"""Initialize AsyncClient with configurable parameters.
6388
@@ -67,6 +92,7 @@ def __init__(
6792
timeout: Request timeout in seconds. None means no timeout (infinite)
6893
max_retries: Maximum number of retry attempts
6994
retry_delay: Delay between retries in seconds
95+
max_queue_size: Maximum number of jobs in the queue
7096
"""
7197
logger.info("🔑 Initializing AsyncClient")
7298

@@ -96,8 +122,132 @@ def __init__(
96122
headers=self.headers, connector=TCPConnector(ssl=ssl), timeout=self.timeout
97123
)
98124

125+
# Initialize job queue
126+
self.job_queue: asyncio.Queue[Job] = asyncio.Queue(maxsize=max_queue_size)
127+
self.jobs: Dict[str, Job] = {}
128+
self._queue_processor_task = None
129+
99130
logger.info("✅ AsyncClient initialized successfully")
100131

132+
async def start_queue_processor(self):
133+
"""Start the background job queue processor."""
134+
if self._queue_processor_task is None:
135+
self._queue_processor_task = asyncio.create_task(self._process_queue())
136+
logger.info("🚀 Job queue processor started")
137+
138+
async def stop_queue_processor(self):
139+
"""Stop the background job queue processor."""
140+
if self._queue_processor_task is not None:
141+
self._queue_processor_task.cancel()
142+
try:
143+
await self._queue_processor_task
144+
except asyncio.CancelledError:
145+
pass
146+
self._queue_processor_task = None
147+
logger.info("⏹️ Job queue processor stopped")
148+
149+
async def _process_queue(self):
150+
"""Process jobs from the queue."""
151+
while True:
152+
try:
153+
job = await self.job_queue.get()
154+
job.status = JobStatus.RUNNING
155+
job.started_at = datetime.now()
156+
157+
try:
158+
if job.task:
159+
job.result = await job.task(*job.args, **(job.kwargs or {}))
160+
job.status = JobStatus.COMPLETED
161+
except Exception as e:
162+
job.error = e
163+
job.status = JobStatus.FAILED
164+
logger.error(f"❌ Job {job.id} failed: {str(e)}")
165+
finally:
166+
job.completed_at = datetime.now()
167+
self.job_queue.task_done()
168+
169+
except asyncio.CancelledError:
170+
break
171+
except Exception as e:
172+
logger.error(f"❌ Queue processor error: {str(e)}")
173+
174+
async def submit_job(self, task: Callable[..., Awaitable[T]], *args, **kwargs) -> str:
175+
"""Submit a new job to the queue.
176+
177+
Args:
178+
task: Async function to execute
179+
*args: Positional arguments for the task
180+
**kwargs: Keyword arguments for the task
181+
182+
Returns:
183+
str: Job ID
184+
"""
185+
job_id = str(uuid4())
186+
job = Job(
187+
id=job_id,
188+
status=JobStatus.PENDING,
189+
created_at=datetime.now(),
190+
task=task,
191+
args=args,
192+
kwargs=kwargs
193+
)
194+
195+
self.jobs[job_id] = job
196+
await self.job_queue.put(job)
197+
logger.info(f"📋 Job {job_id} submitted to queue")
198+
199+
# Ensure queue processor is running
200+
if self._queue_processor_task is None:
201+
await self.start_queue_processor()
202+
203+
return job_id
204+
205+
async def get_job_status(self, job_id: str) -> Dict[str, Any]:
206+
"""Get the status of a job.
207+
208+
Args:
209+
job_id: The ID of the job to check
210+
211+
Returns:
212+
Dict containing job status information
213+
"""
214+
if job_id not in self.jobs:
215+
raise ValueError(f"Job {job_id} not found")
216+
217+
job = self.jobs[job_id]
218+
return {
219+
"id": job.id,
220+
"status": job.status.value,
221+
"created_at": job.created_at,
222+
"started_at": job.started_at,
223+
"completed_at": job.completed_at,
224+
"result": job.result,
225+
"error": str(job.error) if job.error else None
226+
}
227+
228+
async def wait_for_job(self, job_id: str, timeout: Optional[float] = None) -> Any:
229+
"""Wait for a job to complete and return its result.
230+
231+
Args:
232+
job_id: The ID of the job to wait for
233+
timeout: Maximum time to wait in seconds
234+
235+
Returns:
236+
The result of the job
237+
"""
238+
if job_id not in self.jobs:
239+
raise ValueError(f"Job {job_id} not found")
240+
241+
job = self.jobs[job_id]
242+
243+
while job.status in (JobStatus.PENDING, JobStatus.RUNNING):
244+
await asyncio.sleep(0.1)
245+
246+
if job.error:
247+
raise job.error
248+
249+
return job.result
250+
101251
async def _make_request(self, method: str, url: str, **kwargs) -> Any:
102252
"""Make HTTP request with retry logic."""
103253
for attempt in range(self.max_retries):
@@ -285,8 +435,9 @@ async def get_searchscraper(self, request_id: str):
285435
return result
286436

287437
async def close(self):
288-
"""Close the session to free up resources"""
438+
"""Close the session and stop the queue processor."""
289439
logger.info("🔒 Closing AsyncClient session")
440+
await self.stop_queue_processor()
290441
await self.session.close()
291442
logger.debug("✅ Session closed successfully")
292443

scrapegraph-py/uv.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)