Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions laygo/transformers/strategies/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -14,28 +13,51 @@


class HTTPStrategy[In, Out](ExecutionStrategy[In, Out]):
"""
An execution strategy that sends data chunks to a remote HTTP worker.
This is the CLIENT-SIDE implementation.
"""An execution strategy that sends data chunks to a remote HTTP worker.

This is the CLIENT-SIDE implementation that sends chunks to a remote
HTTP endpoint and receives the transformed results back.
"""

def __init__(self, worker_url: Callable[[], str], max_workers: int = 8, timeout: int = 300):
"""Initialize the HTTP strategy.

Args:
worker_url: Function that returns the URL of the remote worker endpoint.
max_workers: Maximum number of concurrent HTTP requests.
timeout: Request timeout in seconds.
"""
self.worker_url = worker_url
self.max_workers = max_workers
self.timeout = timeout
self.session = requests.Session()

def execute(
self,
transformer_logic: InternalTransformer[In, Out], # Note: This is ignored
chunk_generator: Callable[[Iterable[In]], Iterator[list[In]]],
data: Iterable[In],
context: IContextManager, # Note: This is also ignored
) -> Iterator[Out]:
"""Sends data to the remote worker and yields results."""
transformer_logic: InternalTransformer[In, Out], # Ignored for HTTP strategy
chunks: Iterator[list[In]],
context: IContextManager, # Ignored for HTTP strategy
) -> Iterator[list[Out]]:
"""Send data chunks to the remote worker and yield results.

Args:
transformer_logic: Ignored - the remote worker has the transformation logic.
chunks: Iterator of pre-chunked data.
context: Ignored - context is handled by the remote worker.

Returns:
Iterator of transformed chunks received from the remote worker.
"""

def process_chunk(chunk: list[In]) -> list[Out]:
"""Sends one chunk to the worker and returns the result."""
"""Send one chunk to the worker and return the result.

Args:
chunk: Data chunk to send to the remote worker.

Returns:
Transformed chunk received from the remote worker.
"""
try:
response = self.session.post(
self.worker_url(),
Expand All @@ -52,12 +74,13 @@ def process_chunk(chunk: list[In]) -> list[Out]:

# Use a ThreadPoolExecutor to make concurrent HTTP requests
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
chunk_iterator = chunk_generator(data)
chunk_iterator = iter(chunks)
futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers)}

while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield from future.result()
yield future.result()
try:
new_chunk = next(chunk_iterator)
futures.add(executor.submit(process_chunk, new_chunk))
Expand Down
40 changes: 11 additions & 29 deletions laygo/transformers/strategies/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from loky import get_reusable_executor

from laygo.context.types import IContextHandle
from laygo.context.types import IContextManager
from laygo.transformers.strategies.types import ExecutionStrategy
from laygo.transformers.types import InternalTransformer

Expand All @@ -16,37 +17,33 @@ def _worker_process_chunk[In, Out](
context_handle: IContextHandle,
chunk: list[In],
) -> list[Out]:
"""
Top-level function executed by each worker process.
It reconstructs the context proxy from the handle and runs the transformation.
"""
"""Top-level function executed by each worker process."""
context_proxy = context_handle.create_proxy()
try:
return transformer_logic(chunk, context_proxy)
finally:
# The proxy's shutdown is a no-op, but it's good practice to call it.
context_proxy.shutdown()


class ProcessStrategy[In, Out](ExecutionStrategy[In, Out]):
"""Execute transformer logic using a process pool."""

def __init__(self, max_workers: int = 4, ordered: bool = True):
self.max_workers = max_workers
self.ordered = ordered

def execute(self, transformer_logic, chunk_generator, data, context):
def execute(
self,
transformer_logic: InternalTransformer[In, Out],
chunks: Iterator[list[In]],
context: IContextManager,
) -> Iterator[list[Out]]:
"""Execute the transformer by distributing chunks to a process pool."""

# Get the picklable handle from the context manager.
context_handle = context.get_handle()

executor = get_reusable_executor(max_workers=self.max_workers)
chunks_to_process = chunk_generator(data)

gen_func = self._ordered_generator if self.ordered else self._unordered_generator

processed_chunks_iterator = gen_func(chunks_to_process, transformer_logic, executor, context_handle)
for result_chunk in processed_chunks_iterator:
yield from result_chunk
yield from gen_func(chunks, transformer_logic, executor, context_handle)

def _ordered_generator(
self,
Expand All @@ -69,22 +66,16 @@ def _ordered_generator(

try:
while futures:
# Get the result of the oldest task. If it failed or the pool
# is broken, .result() will raise an exception.
result = futures.popleft().result()

# If successful, submit a new task.
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
# No more chunks to process.
pass

yield result
finally:
# This cleanup runs if the loop finishes or if an exception occurs.
# It prevents orphaned processes by cancelling pending tasks.
for future in futures:
future.cancel()
if futures:
Expand All @@ -104,27 +95,18 @@ def _unordered_generator(
}

try:
# as_completed is ideal for this "process as they finish" pattern
for future in as_completed(futures):
# Get the result. This raises an exception if the task failed,
# which immediately stops the loop and proceeds to finally.
result = future.result()

# Remove the completed future from our tracking set
futures.remove(future)

# Try to submit a new task to replace the one that just finished
try:
chunk = next(chunks_iter)
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
# No more chunks left to submit.
pass

yield result
finally:
# Clean up any futures that were still running or pending when
# an exception occurred or the input was exhausted.
for future in futures:
future.cancel()
if futures:
Expand Down
6 changes: 3 additions & 3 deletions laygo/transformers/strategies/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class SequentialStrategy[In, Out](ExecutionStrategy[In, Out]):
def execute(self, transformer_logic, chunk_generator, data, context):
def execute(self, transformer_logic, chunks, context):
# Logic from the original Transformer.__call__
for chunk in chunk_generator(data):
yield from transformer_logic(chunk, context)
for chunk in chunks:
yield transformer_logic(chunk, context)
Loading
Loading