From 8e1b19d5110d5d622e64cb08a2da9e8434dd70f1 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 22 Jul 2025 19:05:36 +0000 Subject: [PATCH] chore: allow overriding chunker function --- laygo/transformers/transformer.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/laygo/transformers/transformer.py b/laygo/transformers/transformer.py index fe22cc0..8c4d3e5 100644 --- a/laygo/transformers/transformer.py +++ b/laygo/transformers/transformer.py @@ -30,6 +30,20 @@ def createTransformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SI return Transformer[T, T](chunk_size=chunk_size) # type: ignore +def build_chunk_generator[T](chunk_size: int) -> Callable[[Iterable[T]], Iterator[list[T]]]: + """ + Returns a function that breaks an iterable into chunks of a specified size. + This is useful for creating transformers that process data in manageable chunks. + """ + + def chunk_generator(data: Iterable[T]) -> Iterator[list[T]]: + data_iter = iter(data) + while chunk := list(itertools.islice(data_iter, chunk_size)): + yield chunk + + return chunk_generator + + class Transformer[In, Out]: """ Defines and composes data transformations by passing context explicitly. @@ -45,6 +59,7 @@ def __init__( # The default transformer now accepts and ignores a context argument. self.transformer: InternalTransformer[In, Out] = transformer or (lambda chunk, ctx: chunk) # type: ignore self.error_handler = ErrorHandler() + self._chunk_generator = build_chunk_generator(chunk_size) if chunk_size else lambda x: iter([list(x)]) @classmethod def from_transformer[T, U]( @@ -58,6 +73,11 @@ def from_transformer[T, U]( transformer=copy.deepcopy(transformer.transformer), # type: ignore ) + def set_chunker(self, chunker: Callable[[Iterable[In]], Iterator[list[In]]]) -> "Transformer[In, Out]": + """Sets a custom chunking function for the transformer.""" + self._chunk_generator = chunker + return self + def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "Transformer[In, Out]": """Registers an error handler for the transformer.""" # This method is a placeholder for future error handling logic. @@ -69,12 +89,6 @@ def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "Trans self.error_handler.on_error(handler) # type: ignore return self - def _chunk_generator(self, data: Iterable[In]) -> Iterator[list[In]]: - """Breaks an iterable into chunks of a specified size.""" - data_iter = iter(data) - while chunk := list(itertools.islice(data_iter, self.chunk_size)): - yield chunk - def _pipe[U](self, operation: Callable[[list[Out], PipelineContext], list[U]]) -> "Transformer[In, U]": """Composes the current transformer with a new context-aware operation.""" prev_transformer = self.transformer