7
7
from loky import get_reusable_executor
8
8
9
9
from laygo .context .types import IContextHandle
10
+ from laygo .context .types import IContextManager
10
11
from laygo .transformers .strategies .types import ExecutionStrategy
11
12
from laygo .transformers .types import InternalTransformer
12
13
@@ -16,37 +17,33 @@ def _worker_process_chunk[In, Out](
16
17
context_handle : IContextHandle ,
17
18
chunk : list [In ],
18
19
) -> list [Out ]:
19
- """
20
- Top-level function executed by each worker process.
21
- It reconstructs the context proxy from the handle and runs the transformation.
22
- """
20
+ """Top-level function executed by each worker process."""
23
21
context_proxy = context_handle .create_proxy ()
24
22
try :
25
23
return transformer_logic (chunk , context_proxy )
26
24
finally :
27
- # The proxy's shutdown is a no-op, but it's good practice to call it.
28
25
context_proxy .shutdown ()
29
26
30
27
31
28
class ProcessStrategy [In , Out ](ExecutionStrategy [In , Out ]):
29
+ """Execute transformer logic using a process pool."""
30
+
32
31
def __init__ (self , max_workers : int = 4 , ordered : bool = True ):
33
32
self .max_workers = max_workers
34
33
self .ordered = ordered
35
34
36
- def execute (self , transformer_logic , chunk_generator , data , context ):
35
+ def execute (
36
+ self ,
37
+ transformer_logic : InternalTransformer [In , Out ],
38
+ chunks : Iterator [list [In ]],
39
+ context : IContextManager ,
40
+ ) -> Iterator [list [Out ]]:
37
41
"""Execute the transformer by distributing chunks to a process pool."""
38
-
39
- # Get the picklable handle from the context manager.
40
42
context_handle = context .get_handle ()
41
-
42
43
executor = get_reusable_executor (max_workers = self .max_workers )
43
- chunks_to_process = chunk_generator (data )
44
44
45
45
gen_func = self ._ordered_generator if self .ordered else self ._unordered_generator
46
-
47
- processed_chunks_iterator = gen_func (chunks_to_process , transformer_logic , executor , context_handle )
48
- for result_chunk in processed_chunks_iterator :
49
- yield from result_chunk
46
+ yield from gen_func (chunks , transformer_logic , executor , context_handle )
50
47
51
48
def _ordered_generator (
52
49
self ,
@@ -69,22 +66,16 @@ def _ordered_generator(
69
66
70
67
try :
71
68
while futures :
72
- # Get the result of the oldest task. If it failed or the pool
73
- # is broken, .result() will raise an exception.
74
69
result = futures .popleft ().result ()
75
70
76
- # If successful, submit a new task.
77
71
try :
78
72
chunk = next (chunks_iter )
79
73
futures .append (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
80
74
except StopIteration :
81
- # No more chunks to process.
82
75
pass
83
76
84
77
yield result
85
78
finally :
86
- # This cleanup runs if the loop finishes or if an exception occurs.
87
- # It prevents orphaned processes by cancelling pending tasks.
88
79
for future in futures :
89
80
future .cancel ()
90
81
if futures :
@@ -104,27 +95,18 @@ def _unordered_generator(
104
95
}
105
96
106
97
try :
107
- # as_completed is ideal for this "process as they finish" pattern
108
98
for future in as_completed (futures ):
109
- # Get the result. This raises an exception if the task failed,
110
- # which immediately stops the loop and proceeds to finally.
111
99
result = future .result ()
112
-
113
- # Remove the completed future from our tracking set
114
100
futures .remove (future )
115
101
116
- # Try to submit a new task to replace the one that just finished
117
102
try :
118
103
chunk = next (chunks_iter )
119
104
futures .add (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
120
105
except StopIteration :
121
- # No more chunks left to submit.
122
106
pass
123
107
124
108
yield result
125
109
finally :
126
- # Clean up any futures that were still running or pending when
127
- # an exception occurred or the input was exhausted.
128
110
for future in futures :
129
111
future .cancel ()
130
112
if futures :
0 commit comments