Skip to content

Commit 2fb67d3

Browse files
committed
fix: use process strategy
1 parent a4e6e00 commit 2fb67d3

File tree

2 files changed

+76
-30
lines changed

2 files changed

+76
-30
lines changed

laygo/transformers/strategies/threaded.py

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -74,52 +74,104 @@ def process_chunk(chunk: list[In]) -> list[Out]:
7474
7575
Args:
7676
chunk: The data chunk to process.
77-
shared_context: The shared context for processing.
7877
7978
Returns:
8079
The processed chunk.
8180
"""
82-
return transformer(chunk, shared_context) # type: ignore
81+
return transformer(chunk, shared_context)
8382

8483
def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
8584
"""Generate results in their original order."""
8685
futures: deque[Future[list[Out]]] = deque()
86+
executor_shutdown = False
8787

8888
# Pre-submit initial batch of futures
89-
for _ in range(min(self.max_workers, 10)): # Limit initial submissions
89+
for _ in range(min(self.max_workers, 10)):
90+
if executor_shutdown:
91+
break
9092
try:
9193
chunk = next(chunks_iter)
9294
futures.append(executor.submit(process_chunk, chunk))
9395
except StopIteration:
9496
break
97+
except RuntimeError as e:
98+
if "cannot schedule new futures after shutdown" in str(e):
99+
executor_shutdown = True
100+
break
101+
raise
95102

96103
while futures:
97-
# Get the next result and submit the next chunk
98-
result = futures.popleft().result()
99-
yield result
100-
101104
try:
102-
chunk = next(chunks_iter)
103-
futures.append(executor.submit(process_chunk, chunk))
104-
except StopIteration:
105-
continue
105+
# Get the next result
106+
result = futures.popleft().result()
107+
yield result
108+
109+
# Try to submit the next chunk only if executor is not shutdown
110+
if not executor_shutdown:
111+
try:
112+
chunk = next(chunks_iter)
113+
futures.append(executor.submit(process_chunk, chunk))
114+
except StopIteration:
115+
continue
116+
except RuntimeError as e:
117+
if "cannot schedule new futures after shutdown" in str(e):
118+
executor_shutdown = True
119+
continue
120+
raise
121+
except Exception:
122+
# Cancel remaining futures and re-raise
123+
for future in futures:
124+
try:
125+
future.cancel()
126+
except Exception:
127+
pass # Ignore cancellation errors
128+
futures.clear()
129+
raise
106130

107131
def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
108132
"""Generate results as they complete."""
133+
futures = set()
134+
executor_shutdown = False
135+
109136
# Pre-submit initial batch
110-
futures = {
111-
executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10))
112-
}
137+
for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10)):
138+
if executor_shutdown:
139+
break
140+
try:
141+
futures.add(executor.submit(process_chunk, chunk))
142+
except RuntimeError as e:
143+
if "cannot schedule new futures after shutdown" in str(e):
144+
executor_shutdown = True
145+
break
146+
raise
113147

114148
while futures:
115-
done, futures = wait(futures, return_when=FIRST_COMPLETED)
116-
for future in done:
117-
yield future.result()
118-
try:
119-
chunk = next(chunks_iter)
120-
futures.add(executor.submit(process_chunk, chunk))
121-
except StopIteration:
122-
continue
149+
try:
150+
done, futures = wait(futures, return_when=FIRST_COMPLETED)
151+
for future in done:
152+
yield future.result()
153+
154+
# Try to submit next chunk only if executor is not shutdown
155+
if not executor_shutdown:
156+
try:
157+
chunk = next(chunks_iter)
158+
futures.add(executor.submit(process_chunk, chunk))
159+
except StopIteration:
160+
continue
161+
except RuntimeError as e:
162+
if "cannot schedule new futures after shutdown" in str(e):
163+
executor_shutdown = True
164+
continue
165+
raise
166+
except Exception:
167+
# Cancel remaining futures and re-raise
168+
for future in futures:
169+
try:
170+
future.cancel()
171+
except Exception:
172+
pass # Ignore cancellation errors
173+
futures.clear()
174+
raise
123175

124176
# Use the reusable thread pool instead of creating a new one
125177
executor = self._get_thread_pool(self.max_workers)
@@ -129,10 +181,3 @@ def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolEx
129181
# Process chunks using the reusable executor
130182
for result_chunk in gen_func(chunks_to_process, executor):
131183
yield from result_chunk
132-
133-
def __del__(self) -> None:
134-
"""Shutdown all cached thread pools. Call this during application cleanup."""
135-
with self._pool_lock:
136-
for pool in self._thread_pools.values():
137-
pool.shutdown(wait=True)
138-
self._thread_pools.clear()

laygo/transformers/transformer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from laygo.errors import ErrorHandler
1818
from laygo.helpers import is_context_aware
1919
from laygo.helpers import is_context_aware_reduce
20+
from laygo.transformers.strategies.process import ProcessStrategy
2021
from laygo.transformers.strategies.sequential import SequentialStrategy
2122
from laygo.transformers.strategies.threaded import ThreadedStrategy
2223
from laygo.transformers.strategies.types import ExecutionStrategy
@@ -89,7 +90,7 @@ def create_process_transformer[T](
8990
"""
9091
return Transformer[T, T](
9192
chunk_size=chunk_size,
92-
strategy=ThreadedStrategy(
93+
strategy=ProcessStrategy(
9394
max_workers=max_workers,
9495
ordered=ordered,
9596
),

0 commit comments

Comments
 (0)