Skip to content

Commit 0742014

Browse files
committed
Fix vLLM queue overflow with serialized semaphore release
Multiple workers could acquire semaphore in rapid succession when queue dropped, causing bursts of 1000+ page submissions and vLLM crashes. Race condition in semaphore release logic - multiple threads could evaluate conditions and release simultaneously before queue updated. Add asyncio.Lock() to serialize release checks, ensuring atomic evaluation and release. All condition checks now happen inside the lock.
1 parent 3eec580 commit 0742014

File tree

1 file changed

+24
-16
lines changed

1 file changed

+24
-16
lines changed

olmocr/pipeline.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,10 @@ def _kill_proc():
632632
running_reqs_decreased = False
633633
server_printed_ready_message = False
634634
last_semaphore_release = time.time()
635+
# Track pages submitted by last worker to ensure some complete before next release
636+
pages_submitted_at_last_release = 0
637+
# Lock to prevent multiple rapid semaphore releases
638+
release_lock = asyncio.Lock()
635639

636640
async def process_line(line):
637641
nonlocal last_running_req, last_queue_req, peak_running_req, running_reqs_decreased, last_semaphore_release, server_printed_ready_message
@@ -673,25 +677,29 @@ async def read_stream(stream):
673677
logger.warning(f"Got {ex} when reading log line from inference server, skipping")
674678

675679
async def timeout_task():
676-
nonlocal last_running_req, last_queue_req, peak_running_req, last_semaphore_release, running_reqs_decreased
680+
nonlocal last_running_req, last_queue_req, peak_running_req, last_semaphore_release, running_reqs_decreased, pages_submitted_at_last_release
681+
677682
try:
678683
while True:
679684
await asyncio.sleep(1)
680-
681-
# Check if we should release the semaphore
682-
should_release = (
683-
server_printed_ready_message
684-
and last_queue_req <= int(peak_running_req * 0.1)
685-
and time.time() - last_semaphore_release > 30
686-
and semaphore.locked()
687-
and (last_running_req == 0 or running_reqs_decreased)
688-
)
689-
690-
if should_release:
691-
semaphore.release()
692-
running_reqs_decreased = False # Reset flag after release
693-
last_semaphore_release = time.time()
694-
logger.info(f"Semaphore released at {last_running_req} running {last_queue_req} queued, peak: {peak_running_req})")
685+
686+
# Only check basic conditions before acquiring lock
687+
if server_printed_ready_message and semaphore.locked():
688+
# Use lock to prevent multiple releases
689+
async with release_lock:
690+
# Check ALL conditions inside the lock to prevent race conditions
691+
should_release = (
692+
semaphore.locked()
693+
and last_queue_req <= int(peak_running_req * 0.1) # Original logic: 10% of peak
694+
and time.time() - last_semaphore_release > 30 # Original 30s cooldown
695+
and (last_running_req == 0 or running_reqs_decreased) # Original condition
696+
)
697+
698+
if should_release:
699+
semaphore.release()
700+
running_reqs_decreased = False # Reset flag after release
701+
last_semaphore_release = time.time()
702+
logger.info(f"Semaphore released at {last_running_req} running {last_queue_req} queued, peak: {peak_running_req})")
695703
except asyncio.CancelledError:
696704
pass # Clean up if the task is cancelled
697705

0 commit comments

Comments
 (0)