Skip to content

Commit 3e43e4a

Browse files
committed
fix: explicit shutdown
1 parent 2c577b2 commit 3e43e4a

File tree

1 file changed

+46
-31
lines changed

1 file changed

+46
-31
lines changed

laygo/transformers/strategies/process.py

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -57,38 +57,46 @@ def _ordered_generator(
5757
) -> Iterator[list[Out]]:
5858
"""Generate results in their original order."""
5959
futures = deque()
60+
executor_shutdown = False
6061

6162
# Pre-submit initial batch of futures
6263
for _ in range(self.max_workers + 1):
64+
if executor_shutdown:
65+
break
6366
try:
6467
chunk = next(chunks_iter)
6568
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
6669
except StopIteration:
6770
break
6871
except RuntimeError as e:
6972
if "cannot schedule new futures after shutdown" in str(e):
73+
executor_shutdown = True
7074
break
7175
raise
7276

7377
while futures:
7478
try:
7579
yield futures.popleft().result()
7680

77-
# Try to submit the next chunk
78-
try:
79-
chunk = next(chunks_iter)
80-
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
81-
except StopIteration:
82-
continue
83-
except RuntimeError as e:
84-
if "cannot schedule new futures after shutdown" in str(e):
85-
# Executor is shut down, stop submitting new work
86-
break
87-
raise
81+
# Try to submit the next chunk only if executor is not shutdown
82+
if not executor_shutdown:
83+
try:
84+
chunk = next(chunks_iter)
85+
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
86+
except StopIteration:
87+
continue
88+
except RuntimeError as e:
89+
if "cannot schedule new futures after shutdown" in str(e):
90+
executor_shutdown = True
91+
continue
92+
raise
8893
except Exception:
8994
# Cancel remaining futures and re-raise
9095
for future in futures:
91-
future.cancel()
96+
try:
97+
future.cancel()
98+
except Exception:
99+
pass # Ignore cancellation errors
92100
futures.clear()
93101
raise
94102

@@ -101,37 +109,44 @@ def _unordered_generator(
101109
) -> Iterator[list[Out]]:
102110
"""Generate results as they complete."""
103111
futures = set()
112+
executor_shutdown = False
104113

105114
# Pre-submit initial batch
106-
try:
107-
for chunk in itertools.islice(chunks_iter, self.max_workers + 1):
115+
for chunk in itertools.islice(chunks_iter, self.max_workers + 1):
116+
if executor_shutdown:
117+
break
118+
try:
108119
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
109-
except RuntimeError as e:
110-
if "cannot schedule new futures after shutdown" in str(e):
111-
# If we can't submit any futures, there's nothing to process
112-
return
113-
raise
120+
except RuntimeError as e:
121+
if "cannot schedule new futures after shutdown" in str(e):
122+
executor_shutdown = True
123+
break
124+
raise
114125

115126
while futures:
116127
try:
117128
done, futures = wait(futures, return_when=FIRST_COMPLETED)
118129
for future in done:
119130
yield future.result()
120131

121-
# Try to submit next chunk if available
122-
try:
123-
chunk = next(chunks_iter)
124-
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
125-
except StopIteration:
126-
continue
127-
except RuntimeError as e:
128-
if "cannot schedule new futures after shutdown" in str(e):
129-
# Executor is shut down, stop submitting new work
130-
break
131-
raise
132+
# Try to submit next chunk only if executor is not shutdown
133+
if not executor_shutdown:
134+
try:
135+
chunk = next(chunks_iter)
136+
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
137+
except StopIteration:
138+
continue
139+
except RuntimeError as e:
140+
if "cannot schedule new futures after shutdown" in str(e):
141+
executor_shutdown = True
142+
continue
143+
raise
132144
except Exception:
133145
# Cancel remaining futures and re-raise
134146
for future in futures:
135-
future.cancel()
147+
try:
148+
future.cancel()
149+
except Exception:
150+
pass # Ignore cancellation errors
136151
futures.clear()
137152
raise

0 commit comments

Comments
 (0)