diff --git a/laygo/transformers/strategies/http.py b/laygo/transformers/strategies/http.py index 5b8a9b8..d7462cb 100644 --- a/laygo/transformers/strategies/http.py +++ b/laygo/transformers/strategies/http.py @@ -1,5 +1,4 @@ from collections.abc import Callable -from collections.abc import Iterable from collections.abc import Iterator from concurrent.futures import FIRST_COMPLETED from concurrent.futures import ThreadPoolExecutor @@ -14,12 +13,20 @@ class HTTPStrategy[In, Out](ExecutionStrategy[In, Out]): - """ - An execution strategy that sends data chunks to a remote HTTP worker. - This is the CLIENT-SIDE implementation. + """An execution strategy that sends data chunks to a remote HTTP worker. + + This is the CLIENT-SIDE implementation that sends chunks to a remote + HTTP endpoint and receives the transformed results back. """ def __init__(self, worker_url: Callable[[], str], max_workers: int = 8, timeout: int = 300): + """Initialize the HTTP strategy. + + Args: + worker_url: Function that returns the URL of the remote worker endpoint. + max_workers: Maximum number of concurrent HTTP requests. + timeout: Request timeout in seconds. + """ self.worker_url = worker_url self.max_workers = max_workers self.timeout = timeout @@ -27,15 +34,30 @@ def __init__(self, worker_url: Callable[[], str], max_workers: int = 8, timeout: def execute( self, - transformer_logic: InternalTransformer[In, Out], # Note: This is ignored - chunk_generator: Callable[[Iterable[In]], Iterator[list[In]]], - data: Iterable[In], - context: IContextManager, # Note: This is also ignored - ) -> Iterator[Out]: - """Sends data to the remote worker and yields results.""" + transformer_logic: InternalTransformer[In, Out], # Ignored for HTTP strategy + chunks: Iterator[list[In]], + context: IContextManager, # Ignored for HTTP strategy + ) -> Iterator[list[Out]]: + """Send data chunks to the remote worker and yield results. + + Args: + transformer_logic: Ignored - the remote worker has the transformation logic. + chunks: Iterator of pre-chunked data. + context: Ignored - context is handled by the remote worker. + + Returns: + Iterator of transformed chunks received from the remote worker. + """ def process_chunk(chunk: list[In]) -> list[Out]: - """Sends one chunk to the worker and returns the result.""" + """Send one chunk to the worker and return the result. + + Args: + chunk: Data chunk to send to the remote worker. + + Returns: + Transformed chunk received from the remote worker. + """ try: response = self.session.post( self.worker_url(), @@ -52,12 +74,13 @@ def process_chunk(chunk: list[In]) -> list[Out]: # Use a ThreadPoolExecutor to make concurrent HTTP requests with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - chunk_iterator = chunk_generator(data) + chunk_iterator = iter(chunks) futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers)} + while futures: done, futures = wait(futures, return_when=FIRST_COMPLETED) for future in done: - yield from future.result() + yield future.result() try: new_chunk = next(chunk_iterator) futures.add(executor.submit(process_chunk, new_chunk)) diff --git a/laygo/transformers/strategies/process.py b/laygo/transformers/strategies/process.py index ed50e5e..83465ca 100644 --- a/laygo/transformers/strategies/process.py +++ b/laygo/transformers/strategies/process.py @@ -7,6 +7,7 @@ from loky import get_reusable_executor from laygo.context.types import IContextHandle +from laygo.context.types import IContextManager from laygo.transformers.strategies.types import ExecutionStrategy from laygo.transformers.types import InternalTransformer @@ -16,37 +17,33 @@ def _worker_process_chunk[In, Out]( context_handle: IContextHandle, chunk: list[In], ) -> list[Out]: - """ - Top-level function executed by each worker process. - It reconstructs the context proxy from the handle and runs the transformation. - """ + """Top-level function executed by each worker process.""" context_proxy = context_handle.create_proxy() try: return transformer_logic(chunk, context_proxy) finally: - # The proxy's shutdown is a no-op, but it's good practice to call it. context_proxy.shutdown() class ProcessStrategy[In, Out](ExecutionStrategy[In, Out]): + """Execute transformer logic using a process pool.""" + def __init__(self, max_workers: int = 4, ordered: bool = True): self.max_workers = max_workers self.ordered = ordered - def execute(self, transformer_logic, chunk_generator, data, context): + def execute( + self, + transformer_logic: InternalTransformer[In, Out], + chunks: Iterator[list[In]], + context: IContextManager, + ) -> Iterator[list[Out]]: """Execute the transformer by distributing chunks to a process pool.""" - - # Get the picklable handle from the context manager. context_handle = context.get_handle() - executor = get_reusable_executor(max_workers=self.max_workers) - chunks_to_process = chunk_generator(data) gen_func = self._ordered_generator if self.ordered else self._unordered_generator - - processed_chunks_iterator = gen_func(chunks_to_process, transformer_logic, executor, context_handle) - for result_chunk in processed_chunks_iterator: - yield from result_chunk + yield from gen_func(chunks, transformer_logic, executor, context_handle) def _ordered_generator( self, @@ -69,22 +66,16 @@ def _ordered_generator( try: while futures: - # Get the result of the oldest task. If it failed or the pool - # is broken, .result() will raise an exception. result = futures.popleft().result() - # If successful, submit a new task. try: chunk = next(chunks_iter) futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) except StopIteration: - # No more chunks to process. pass yield result finally: - # This cleanup runs if the loop finishes or if an exception occurs. - # It prevents orphaned processes by cancelling pending tasks. for future in futures: future.cancel() if futures: @@ -104,27 +95,18 @@ def _unordered_generator( } try: - # as_completed is ideal for this "process as they finish" pattern for future in as_completed(futures): - # Get the result. This raises an exception if the task failed, - # which immediately stops the loop and proceeds to finally. result = future.result() - - # Remove the completed future from our tracking set futures.remove(future) - # Try to submit a new task to replace the one that just finished try: chunk = next(chunks_iter) futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) except StopIteration: - # No more chunks left to submit. pass yield result finally: - # Clean up any futures that were still running or pending when - # an exception occurred or the input was exhausted. for future in futures: future.cancel() if futures: diff --git a/laygo/transformers/strategies/sequential.py b/laygo/transformers/strategies/sequential.py index a936d6e..a77576d 100644 --- a/laygo/transformers/strategies/sequential.py +++ b/laygo/transformers/strategies/sequential.py @@ -2,7 +2,7 @@ class SequentialStrategy[In, Out](ExecutionStrategy[In, Out]): - def execute(self, transformer_logic, chunk_generator, data, context): + def execute(self, transformer_logic, chunks, context): # Logic from the original Transformer.__call__ - for chunk in chunk_generator(data): - yield from transformer_logic(chunk, context) + for chunk in chunks: + yield transformer_logic(chunk, context) diff --git a/laygo/transformers/strategies/threaded.py b/laygo/transformers/strategies/threaded.py index 5e8444f..97e814e 100644 --- a/laygo/transformers/strategies/threaded.py +++ b/laygo/transformers/strategies/threaded.py @@ -1,183 +1,130 @@ from collections import deque -from collections.abc import Iterable from collections.abc import Iterator -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed from concurrent.futures import wait import itertools -import threading -from typing import ClassVar from laygo.context.types import IContextManager -from laygo.transformers.strategies.types import ChunkGenerator from laygo.transformers.strategies.types import ExecutionStrategy from laygo.transformers.types import InternalTransformer class ThreadedStrategy[In, Out](ExecutionStrategy[In, Out]): - # Class-level thread pool cache to reuse executors - _thread_pools: ClassVar[dict[int, ThreadPoolExecutor]] = {} - _pool_lock: ClassVar[threading.Lock] = threading.Lock() + """Execute transformer logic using a thread pool.""" def __init__(self, max_workers: int = 4, ordered: bool = True): + """Initialize the threaded strategy. + + Args: + max_workers: Maximum number of worker threads. + ordered: Whether to preserve order of results. + """ self.max_workers = max_workers self.ordered = ordered - @classmethod - def _get_thread_pool(cls, max_workers: int) -> ThreadPoolExecutor: - """Get or create a reusable thread pool for the given worker count.""" - with cls._pool_lock: - if max_workers not in cls._thread_pools: - cls._thread_pools[max_workers] = ThreadPoolExecutor( - max_workers=max_workers, thread_name_prefix=f"laygo-{max_workers}" - ) - return cls._thread_pools[max_workers] - - def execute(self, transformer_logic, chunk_generator, data, context): - """Execute the transformer on data concurrently. - - Uses a reusable thread pool to minimize thread creation overhead. + def execute( + self, + transformer_logic: InternalTransformer[In, Out], + chunks: Iterator[list[In]], + context: IContextManager, + ) -> Iterator[list[Out]]: + """Execute the transformer by distributing chunks to a thread pool. Args: transformer_logic: The transformation function to apply. - chunk_generator: Function to generate data chunks. - data: The input data to process. - context: Optional pipeline context for shared state. + chunks: Iterator of pre-chunked data. + context: Context manager for the execution. Returns: - An iterator over the transformed data. + Iterator of transformed chunks. """ - yield from self._execute_with_context(data, transformer_logic, context, chunk_generator) + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + gen_func = self._ordered_generator if self.ordered else self._unordered_generator + yield from gen_func(chunks, transformer_logic, executor, context) - def _execute_with_context( + def _ordered_generator( self, - data: Iterable[In], + chunks_iter: Iterator[list[In]], transformer: InternalTransformer[In, Out], - shared_context: IContextManager, - chunk_generator: ChunkGenerator[In], - ) -> Iterator[Out]: - """Execute the transformation logic with a given context. + executor: ThreadPoolExecutor, + context: IContextManager, + ) -> Iterator[list[Out]]: + """Generate results in their original order, with robust error handling. Args: - data: The input data to process. + chunks_iter: Iterator of chunks to process. transformer: The transformation function to apply. - shared_context: The shared context for the execution. - chunk_generator: Function to generate data chunks. + executor: Thread pool executor. + context: Context manager for the execution. Returns: - An iterator over the transformed data. + Iterator of transformed chunks in original order. """ + futures = deque() + chunks_iter = iter(chunks_iter) + + # Submit the initial batch of tasks + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append(executor.submit(transformer, chunk, context)) + except StopIteration: + break + + try: + while futures: + result = futures.popleft().result() - def process_chunk(chunk: list[In]) -> list[Out]: - """Process a single chunk by passing the chunk and context explicitly. + try: + chunk = next(chunks_iter) + futures.append(executor.submit(transformer, chunk, context)) + except StopIteration: + pass - Args: - chunk: The data chunk to process. + yield result + finally: + for future in futures: + future.cancel() + if futures: + wait(list(futures)) - Returns: - The processed chunk. - """ - return transformer(chunk, shared_context) + def _unordered_generator( + self, + chunks_iter: Iterator[list[In]], + transformer: InternalTransformer[In, Out], + executor: ThreadPoolExecutor, + context: IContextManager, + ) -> Iterator[list[Out]]: + """Generate results as they complete, with robust error handling. - def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results in their original order.""" - futures: deque[Future[list[Out]]] = deque() - executor_shutdown = False + Args: + chunks_iter: Iterator of chunks to process. + transformer: The transformation function to apply. + executor: Thread pool executor. + context: Context manager for the execution. - # Pre-submit initial batch of futures - for _ in range(min(self.max_workers, 10)): - if executor_shutdown: - break - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk, chunk)) - except StopIteration: - break - except RuntimeError as e: - if "cannot schedule new futures after shutdown" in str(e): - executor_shutdown = True - break - raise + Returns: + Iterator of transformed chunks as they complete. + """ + futures = { + executor.submit(transformer, chunk, context) for chunk in itertools.islice(chunks_iter, self.max_workers + 1) + } - while futures: - try: - # Get the next result - result = futures.popleft().result() - yield result - - # Try to submit the next chunk only if executor is not shutdown - if not executor_shutdown: - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk, chunk)) - except StopIteration: - continue - except RuntimeError as e: - if "cannot schedule new futures after shutdown" in str(e): - executor_shutdown = True - continue - raise - except Exception: - # Cancel remaining futures and re-raise - for future in futures: - try: - future.cancel() - except Exception: - pass # Ignore cancellation errors - futures.clear() - raise - - def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results as they complete.""" - futures = set() - executor_shutdown = False - - # Pre-submit initial batch - for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10)): - if executor_shutdown: - break - try: - futures.add(executor.submit(process_chunk, chunk)) - except RuntimeError as e: - if "cannot schedule new futures after shutdown" in str(e): - executor_shutdown = True - break - raise + try: + for future in as_completed(futures): + result = future.result() + futures.remove(future) - while futures: try: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() - - # Try to submit next chunk only if executor is not shutdown - if not executor_shutdown: - try: - chunk = next(chunks_iter) - futures.add(executor.submit(process_chunk, chunk)) - except StopIteration: - continue - except RuntimeError as e: - if "cannot schedule new futures after shutdown" in str(e): - executor_shutdown = True - continue - raise - except Exception: - # Cancel remaining futures and re-raise - for future in futures: - try: - future.cancel() - except Exception: - pass # Ignore cancellation errors - futures.clear() - raise - - # Use the reusable thread pool instead of creating a new one - executor = self._get_thread_pool(self.max_workers) - chunks_to_process = chunk_generator(data) - gen_func = _ordered_generator if self.ordered else _unordered_generator - - # Process chunks using the reusable executor - for result_chunk in gen_func(chunks_to_process, executor): - yield from result_chunk + chunk = next(chunks_iter) + futures.add(executor.submit(transformer, chunk, context)) + except StopIteration: + pass + + yield result + finally: + for future in futures: + future.cancel() + if futures: + wait(futures) diff --git a/laygo/transformers/strategies/types.py b/laygo/transformers/strategies/types.py index 47fbc34..d601026 100644 --- a/laygo/transformers/strategies/types.py +++ b/laygo/transformers/strategies/types.py @@ -1,25 +1,33 @@ from abc import ABC from abc import abstractmethod -from collections.abc import Callable -from collections.abc import Iterable from collections.abc import Iterator from laygo.context.types import IContextManager from laygo.transformers.types import InternalTransformer -type ChunkGenerator[In] = Callable[[Iterable[In]], Iterator[list[In]]] - class ExecutionStrategy[In, Out](ABC): - """Defines the contract for all execution strategies.""" + """Abstract base class for execution strategies. + + Strategies handle how transformer logic is executed (sequentially, + threaded, in processes, etc.) but do not handle chunking. + """ @abstractmethod def execute( self, transformer_logic: InternalTransformer[In, Out], - chunk_generator: Callable[[Iterable[In]], Iterator[list[In]]], - data: Iterable[In], + chunks: Iterator[list[In]], context: IContextManager, - ) -> Iterator[Out]: - """Runs the transformation logic on the data.""" - raise NotImplementedError + ) -> Iterator[list[Out]]: + """Execute transformer logic on pre-chunked data. + + Args: + transformer_logic: The transformation function to apply. + chunks: Iterator of pre-chunked data. + context: Context manager for the execution. + + Returns: + Iterator of transformed chunks. + """ + ... diff --git a/laygo/transformers/transformer.py b/laygo/transformers/transformer.py index 90cafbd..07d4599 100644 --- a/laygo/transformers/transformer.py +++ b/laygo/transformers/transformer.py @@ -22,6 +22,7 @@ from laygo.transformers.strategies.threaded import ThreadedStrategy from laygo.transformers.strategies.types import ExecutionStrategy from laygo.transformers.types import BaseTransformer +from laygo.transformers.types import In from laygo.transformers.types import InternalTransformer DEFAULT_CHUNK_SIZE = 1000 @@ -391,10 +392,16 @@ def operation(chunk: list[Out], ctx: IContextManager) -> list[Out]: return self._pipe(operation) def __call__(self, data: Iterable[In], context: IContextManager | None = None) -> Iterator[Out]: - """Execute the transformer by delegating to its strategy.""" + """Execute the transformer by first chunking data, then delegating to strategy.""" run_context = context if context is not None else self._default_context - # The new __call__ is just one line! - return self.strategy.execute(self.transformer, self._chunk_generator, data, run_context) + + # Transformer handles chunking, then passes chunks to strategy + chunks = self._chunk_generator(data) + transformed_chunks = self.strategy.execute(self.transformer, chunks, run_context) + + # Flatten the chunks back into individual elements + for chunk in transformed_chunks: + yield from chunk @overload def reduce[U](