Skip to content

Commit a4e6e00

Browse files
committed
fix: cancel futures after shutdown and dont schedule them
1 parent acb730f commit a4e6e00

File tree

1 file changed

+52
-15
lines changed

1 file changed

+52
-15
lines changed

laygo/transformers/strategies/process.py

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from collections import deque
22
from collections.abc import Iterator
3-
from concurrent.futures import FIRST_COMPLETED
43
from concurrent.futures import wait
54
import itertools
65

6+
from loky import as_completed
77
from loky import get_reusable_executor
88

99
from laygo.context.types import IContextHandle
@@ -55,21 +55,40 @@ def _ordered_generator(
5555
executor,
5656
context_handle: IContextHandle,
5757
) -> Iterator[list[Out]]:
58-
"""Generate results in their original order."""
58+
"""Generate results in their original order, with robust error handling."""
5959
futures = deque()
60+
chunks_iter = iter(chunks_iter)
61+
62+
# Submit the initial batch of tasks
6063
for _ in range(self.max_workers + 1):
6164
try:
6265
chunk = next(chunks_iter)
6366
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
6467
except StopIteration:
6568
break
66-
while futures:
67-
yield futures.popleft().result()
68-
try:
69-
chunk = next(chunks_iter)
70-
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
71-
except StopIteration:
72-
continue
69+
70+
try:
71+
while futures:
72+
# Get the result of the oldest task. If it failed or the pool
73+
# is broken, .result() will raise an exception.
74+
result = futures.popleft().result()
75+
76+
# If successful, submit a new task.
77+
try:
78+
chunk = next(chunks_iter)
79+
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
80+
except StopIteration:
81+
# No more chunks to process.
82+
pass
83+
84+
yield result
85+
finally:
86+
# This cleanup runs if the loop finishes or if an exception occurs.
87+
# It prevents orphaned processes by cancelling pending tasks.
88+
for future in futures:
89+
future.cancel()
90+
if futures:
91+
wait(list(futures))
7392

7493
def _unordered_generator(
7594
self,
@@ -78,17 +97,35 @@ def _unordered_generator(
7897
executor,
7998
context_handle: IContextHandle,
8099
) -> Iterator[list[Out]]:
81-
"""Generate results as they complete."""
100+
"""Generate results as they complete, with robust error handling."""
82101
futures = {
83102
executor.submit(_worker_process_chunk, transformer, context_handle, chunk)
84103
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
85104
}
86-
while futures:
87-
done, futures = wait(futures, return_when=FIRST_COMPLETED)
88-
for future in done:
89-
yield future.result()
105+
106+
try:
107+
# as_completed is ideal for this "process as they finish" pattern
108+
for future in as_completed(futures):
109+
# Get the result. This raises an exception if the task failed,
110+
# which immediately stops the loop and proceeds to finally.
111+
result = future.result()
112+
113+
# Remove the completed future from our tracking set
114+
futures.remove(future)
115+
116+
# Try to submit a new task to replace the one that just finished
90117
try:
91118
chunk = next(chunks_iter)
92119
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
93120
except StopIteration:
94-
continue
121+
# No more chunks left to submit.
122+
pass
123+
124+
yield result
125+
finally:
126+
# Clean up any futures that were still running or pending when
127+
# an exception occurred or the input was exhausted.
128+
for future in futures:
129+
future.cancel()
130+
if futures:
131+
wait(futures)

0 commit comments

Comments
 (0)