Skip to content

Commit 8ae4c1d

Browse files
authored
Merge pull request #19 from ringoldsdev/fix/20250801/cannot-schedule-futures-after-shutdown
fix: cancel futures after shutdown and dont schedule them
2 parents acb730f + 2fb67d3 commit 8ae4c1d

File tree

3 files changed

+128
-45
lines changed

3 files changed

+128
-45
lines changed

laygo/transformers/strategies/process.py

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from collections import deque
22
from collections.abc import Iterator
3-
from concurrent.futures import FIRST_COMPLETED
43
from concurrent.futures import wait
54
import itertools
65

6+
from loky import as_completed
77
from loky import get_reusable_executor
88

99
from laygo.context.types import IContextHandle
@@ -55,21 +55,40 @@ def _ordered_generator(
5555
executor,
5656
context_handle: IContextHandle,
5757
) -> Iterator[list[Out]]:
58-
"""Generate results in their original order."""
58+
"""Generate results in their original order, with robust error handling."""
5959
futures = deque()
60+
chunks_iter = iter(chunks_iter)
61+
62+
# Submit the initial batch of tasks
6063
for _ in range(self.max_workers + 1):
6164
try:
6265
chunk = next(chunks_iter)
6366
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
6467
except StopIteration:
6568
break
66-
while futures:
67-
yield futures.popleft().result()
68-
try:
69-
chunk = next(chunks_iter)
70-
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
71-
except StopIteration:
72-
continue
69+
70+
try:
71+
while futures:
72+
# Get the result of the oldest task. If it failed or the pool
73+
# is broken, .result() will raise an exception.
74+
result = futures.popleft().result()
75+
76+
# If successful, submit a new task.
77+
try:
78+
chunk = next(chunks_iter)
79+
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
80+
except StopIteration:
81+
# No more chunks to process.
82+
pass
83+
84+
yield result
85+
finally:
86+
# This cleanup runs if the loop finishes or if an exception occurs.
87+
# It prevents orphaned processes by cancelling pending tasks.
88+
for future in futures:
89+
future.cancel()
90+
if futures:
91+
wait(list(futures))
7392

7493
def _unordered_generator(
7594
self,
@@ -78,17 +97,35 @@ def _unordered_generator(
7897
executor,
7998
context_handle: IContextHandle,
8099
) -> Iterator[list[Out]]:
81-
"""Generate results as they complete."""
100+
"""Generate results as they complete, with robust error handling."""
82101
futures = {
83102
executor.submit(_worker_process_chunk, transformer, context_handle, chunk)
84103
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
85104
}
86-
while futures:
87-
done, futures = wait(futures, return_when=FIRST_COMPLETED)
88-
for future in done:
89-
yield future.result()
105+
106+
try:
107+
# as_completed is ideal for this "process as they finish" pattern
108+
for future in as_completed(futures):
109+
# Get the result. This raises an exception if the task failed,
110+
# which immediately stops the loop and proceeds to finally.
111+
result = future.result()
112+
113+
# Remove the completed future from our tracking set
114+
futures.remove(future)
115+
116+
# Try to submit a new task to replace the one that just finished
90117
try:
91118
chunk = next(chunks_iter)
92119
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
93120
except StopIteration:
94-
continue
121+
# No more chunks left to submit.
122+
pass
123+
124+
yield result
125+
finally:
126+
# Clean up any futures that were still running or pending when
127+
# an exception occurred or the input was exhausted.
128+
for future in futures:
129+
future.cancel()
130+
if futures:
131+
wait(futures)

laygo/transformers/strategies/threaded.py

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -74,52 +74,104 @@ def process_chunk(chunk: list[In]) -> list[Out]:
7474
7575
Args:
7676
chunk: The data chunk to process.
77-
shared_context: The shared context for processing.
7877
7978
Returns:
8079
The processed chunk.
8180
"""
82-
return transformer(chunk, shared_context) # type: ignore
81+
return transformer(chunk, shared_context)
8382

8483
def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
8584
"""Generate results in their original order."""
8685
futures: deque[Future[list[Out]]] = deque()
86+
executor_shutdown = False
8787

8888
# Pre-submit initial batch of futures
89-
for _ in range(min(self.max_workers, 10)): # Limit initial submissions
89+
for _ in range(min(self.max_workers, 10)):
90+
if executor_shutdown:
91+
break
9092
try:
9193
chunk = next(chunks_iter)
9294
futures.append(executor.submit(process_chunk, chunk))
9395
except StopIteration:
9496
break
97+
except RuntimeError as e:
98+
if "cannot schedule new futures after shutdown" in str(e):
99+
executor_shutdown = True
100+
break
101+
raise
95102

96103
while futures:
97-
# Get the next result and submit the next chunk
98-
result = futures.popleft().result()
99-
yield result
100-
101104
try:
102-
chunk = next(chunks_iter)
103-
futures.append(executor.submit(process_chunk, chunk))
104-
except StopIteration:
105-
continue
105+
# Get the next result
106+
result = futures.popleft().result()
107+
yield result
108+
109+
# Try to submit the next chunk only if executor is not shutdown
110+
if not executor_shutdown:
111+
try:
112+
chunk = next(chunks_iter)
113+
futures.append(executor.submit(process_chunk, chunk))
114+
except StopIteration:
115+
continue
116+
except RuntimeError as e:
117+
if "cannot schedule new futures after shutdown" in str(e):
118+
executor_shutdown = True
119+
continue
120+
raise
121+
except Exception:
122+
# Cancel remaining futures and re-raise
123+
for future in futures:
124+
try:
125+
future.cancel()
126+
except Exception:
127+
pass # Ignore cancellation errors
128+
futures.clear()
129+
raise
106130

107131
def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
108132
"""Generate results as they complete."""
133+
futures = set()
134+
executor_shutdown = False
135+
109136
# Pre-submit initial batch
110-
futures = {
111-
executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10))
112-
}
137+
for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10)):
138+
if executor_shutdown:
139+
break
140+
try:
141+
futures.add(executor.submit(process_chunk, chunk))
142+
except RuntimeError as e:
143+
if "cannot schedule new futures after shutdown" in str(e):
144+
executor_shutdown = True
145+
break
146+
raise
113147

114148
while futures:
115-
done, futures = wait(futures, return_when=FIRST_COMPLETED)
116-
for future in done:
117-
yield future.result()
118-
try:
119-
chunk = next(chunks_iter)
120-
futures.add(executor.submit(process_chunk, chunk))
121-
except StopIteration:
122-
continue
149+
try:
150+
done, futures = wait(futures, return_when=FIRST_COMPLETED)
151+
for future in done:
152+
yield future.result()
153+
154+
# Try to submit next chunk only if executor is not shutdown
155+
if not executor_shutdown:
156+
try:
157+
chunk = next(chunks_iter)
158+
futures.add(executor.submit(process_chunk, chunk))
159+
except StopIteration:
160+
continue
161+
except RuntimeError as e:
162+
if "cannot schedule new futures after shutdown" in str(e):
163+
executor_shutdown = True
164+
continue
165+
raise
166+
except Exception:
167+
# Cancel remaining futures and re-raise
168+
for future in futures:
169+
try:
170+
future.cancel()
171+
except Exception:
172+
pass # Ignore cancellation errors
173+
futures.clear()
174+
raise
123175

124176
# Use the reusable thread pool instead of creating a new one
125177
executor = self._get_thread_pool(self.max_workers)
@@ -129,10 +181,3 @@ def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolEx
129181
# Process chunks using the reusable executor
130182
for result_chunk in gen_func(chunks_to_process, executor):
131183
yield from result_chunk
132-
133-
def __del__(self) -> None:
134-
"""Shutdown all cached thread pools. Call this during application cleanup."""
135-
with self._pool_lock:
136-
for pool in self._thread_pools.values():
137-
pool.shutdown(wait=True)
138-
self._thread_pools.clear()

laygo/transformers/transformer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from laygo.errors import ErrorHandler
1818
from laygo.helpers import is_context_aware
1919
from laygo.helpers import is_context_aware_reduce
20+
from laygo.transformers.strategies.process import ProcessStrategy
2021
from laygo.transformers.strategies.sequential import SequentialStrategy
2122
from laygo.transformers.strategies.threaded import ThreadedStrategy
2223
from laygo.transformers.strategies.types import ExecutionStrategy
@@ -89,7 +90,7 @@ def create_process_transformer[T](
8990
"""
9091
return Transformer[T, T](
9192
chunk_size=chunk_size,
92-
strategy=ThreadedStrategy(
93+
strategy=ProcessStrategy(
9394
max_workers=max_workers,
9495
ordered=ordered,
9596
),

0 commit comments

Comments
 (0)