|
8 | 8 | import asyncio |
9 | 9 | import json |
10 | 10 | import logging |
| 11 | +import time |
11 | 12 | import traceback |
12 | 13 | from collections.abc import Awaitable, Callable |
13 | 14 | from typing import TYPE_CHECKING, Any |
|
17 | 18 |
|
18 | 19 | logger = logging.getLogger(__name__) |
19 | 20 |
|
| 21 | +# Progress logging interval in seconds |
| 22 | +PROGRESS_LOG_INTERVAL = 30 |
| 23 | + |
20 | 24 |
|
21 | 25 | def fq_table(table: str, schema: str | None = None) -> str: |
22 | 26 | """Get fully-qualified table name with optional schema prefix.""" |
@@ -66,6 +70,9 @@ def __init__( |
66 | 70 | self._current_tasks: set[asyncio.Task] = set() |
67 | 71 | self._in_flight_count = 0 |
68 | 72 | self._in_flight_lock = asyncio.Lock() |
| 73 | + self._last_progress_log = 0.0 |
| 74 | + self._tasks_completed_since_log = 0 |
| 75 | + self._active_banks: set[str] = set() |
69 | 76 |
|
70 | 77 | async def claim_batch(self) -> list[tuple[str, dict[str, Any]]]: |
71 | 78 | """ |
@@ -281,6 +288,9 @@ async def run(self): |
281 | 288 | except asyncio.TimeoutError: |
282 | 289 | pass # Normal timeout, continue polling |
283 | 290 |
|
| 291 | + # Log progress stats periodically |
| 292 | + await self._log_progress_if_due() |
| 293 | + |
284 | 294 | except asyncio.CancelledError: |
285 | 295 | logger.info(f"Worker {self._worker_id} polling loop cancelled") |
286 | 296 | break |
@@ -317,6 +327,72 @@ async def shutdown_graceful(self, timeout: float = 30.0): |
317 | 327 |
|
318 | 328 | logger.warning(f"Worker {self._worker_id} shutdown timeout after {timeout}s") |
319 | 329 |
|
| 330 | + async def _log_progress_if_due(self): |
| 331 | + """Log progress stats every PROGRESS_LOG_INTERVAL seconds.""" |
| 332 | + now = time.time() |
| 333 | + if now - self._last_progress_log < PROGRESS_LOG_INTERVAL: |
| 334 | + return |
| 335 | + |
| 336 | + self._last_progress_log = now |
| 337 | + |
| 338 | + try: |
| 339 | + table = fq_table("async_operations", self._schema) |
| 340 | + async with self._pool.acquire() as conn: |
| 341 | + # Get global stats by status |
| 342 | + stats = await conn.fetch( |
| 343 | + f""" |
| 344 | + SELECT status, COUNT(*) as count |
| 345 | + FROM {table} |
| 346 | + WHERE created_at > now() - interval '24 hours' |
| 347 | + GROUP BY status |
| 348 | + """ |
| 349 | + ) |
| 350 | + |
| 351 | + # Get currently processing tasks grouped by type and bank |
| 352 | + processing = await conn.fetch( |
| 353 | + f""" |
| 354 | + SELECT operation_type, bank_id, COUNT(*) as count |
| 355 | + FROM {table} |
| 356 | + WHERE status = 'processing' |
| 357 | + GROUP BY operation_type, bank_id |
| 358 | + """ |
| 359 | + ) |
| 360 | + |
| 361 | + # Build stats dict |
| 362 | + status_counts = {row["status"]: row["count"] for row in stats} |
| 363 | + pending = status_counts.get("pending", 0) |
| 364 | + processing_count = status_counts.get("processing", 0) |
| 365 | + completed = status_counts.get("completed", 0) |
| 366 | + failed = status_counts.get("failed", 0) |
| 367 | + |
| 368 | + # Build processing breakdown |
| 369 | + processing_info = [] |
| 370 | + banks_working = set() |
| 371 | + for row in processing: |
| 372 | + op_type = row["operation_type"] |
| 373 | + bank_id = row["bank_id"] |
| 374 | + count = row["count"] |
| 375 | + banks_working.add(bank_id) |
| 376 | + processing_info.append(f"{op_type}:{bank_id}({count})") |
| 377 | + |
| 378 | + # Format log |
| 379 | + async with self._in_flight_lock: |
| 380 | + in_flight = self._in_flight_count |
| 381 | + |
| 382 | + processing_str = ", ".join(processing_info[:10]) if processing_info else "none" |
| 383 | + if len(processing_info) > 10: |
| 384 | + processing_str += f" +{len(processing_info) - 10} more" |
| 385 | + |
| 386 | + logger.info( |
| 387 | + f"[WORKER_STATS] worker={self._worker_id} in_flight={in_flight} | " |
| 388 | + f"global: pending={pending} processing={processing_count} " |
| 389 | + f"completed_24h={completed} failed_24h={failed} | " |
| 390 | + f"active: {processing_str}" |
| 391 | + ) |
| 392 | + |
| 393 | + except Exception as e: |
| 394 | + logger.debug(f"Failed to log progress stats: {e}") |
| 395 | + |
320 | 396 | @property |
321 | 397 | def worker_id(self) -> str: |
322 | 398 | """Get the worker ID.""" |
|
0 commit comments