Skip to content

Commit 38c06f6

Browse files
committed
feat: ability to use queue chunks
1 parent ae6f5fd commit 38c06f6

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

laygo/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
from laygo.transformers.threaded import ThreadedTransformer
1515
from laygo.transformers.threaded import createThreadedTransformer
1616
from laygo.transformers.transformer import Transformer
17+
from laygo.transformers.transformer import build_chunk_generator
1718
from laygo.transformers.transformer import createTransformer
19+
from laygo.transformers.transformer import passthrough_chunks
1820

1921
__all__ = [
2022
"Pipeline",
@@ -28,4 +30,6 @@
2830
"createHTTPTransformer",
2931
"PipelineContext",
3032
"ErrorHandler",
33+
"passthrough_chunks",
34+
"build_chunk_generator",
3135
]

laygo/pipeline.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from laygo.helpers import PipelineContext
1515
from laygo.helpers import is_context_aware
1616
from laygo.transformers.transformer import Transformer
17+
from laygo.transformers.transformer import passthrough_chunks
1718

1819
T = TypeVar("T")
1920
U = TypeVar("U")
@@ -154,6 +155,7 @@ def branch(
154155
branches: dict[str, Transformer[T, Any]],
155156
batch_size: int = 1000,
156157
max_batch_buffer: int = 1,
158+
use_queue_chunks: bool = True,
157159
) -> dict[str, list[Any]]:
158160
"""Forks the pipeline into multiple branches for concurrent, parallel processing."""
159161
if not branches:
@@ -185,7 +187,10 @@ def consumer(transformer: Transformer, queue: Queue) -> list[Any]:
185187

186188
def stream_from_queue() -> Iterator[T]:
187189
while (batch := queue.get()) is not None:
188-
yield from batch
190+
yield batch
191+
192+
if use_queue_chunks:
193+
transformer = transformer.set_chunker(passthrough_chunks)
189194

190195
result_iterator = transformer(stream_from_queue(), self.ctx) # type: ignore
191196
return list(result_iterator)

laygo/transformers/transformer.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ def chunk_generator(data: Iterable[T]) -> Iterator[list[T]]:
6060
return chunk_generator
6161

6262

63+
def passthrough_chunks[T](data: Iterable[list[T]]) -> Iterator[list[T]]:
64+
"""A chunk generator that yields the entire input as a single chunk.
65+
66+
This is useful for transformers that do not require chunking.
67+
68+
Args:
69+
data: The input data to process.
70+
71+
Returns:
72+
An iterator yielding the entire input as a single chunk.
73+
"""
74+
yield from iter(data)
75+
76+
6377
class Transformer[In, Out]:
6478
"""Define and compose data transformations by passing context explicitly.
6579

0 commit comments

Comments
 (0)