Skip to content

Commit 455849b

Browse files
authored
Merge pull request #11 from ringoldsdev/feat/20250723/cleanup
Feat/20250723/cleanup
2 parents 19f67df + 350938c commit 455849b

File tree

3 files changed

+335
-18
lines changed

3 files changed

+335
-18
lines changed

README.md

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
**Laygo** is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API.
2121

22-
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.
22+
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.
2323

2424
**Key Features:**
2525

@@ -31,6 +31,8 @@ It's built to grow with you. Scale seamlessly from a single local script to thou
3131

3232
- **Effortless Parallelism**: Accelerate CPU-intensive tasks seamlessly.
3333

34+
- **Fan-out Processing**: Split pipelines into multiple concurrent branches for parallel analysis of the same dataset.
35+
3436
- **Distributed by Design**: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one.
3537

3638
- **Powerful Context Management**: Share state and configuration across your entire pipeline for advanced, stateful processing.
@@ -197,6 +199,110 @@ results = (
197199
)
198200
```
199201

202+
### Pipeline Branching (Fan-out Processing)
203+
204+
```python
205+
from laygo import Pipeline
206+
from laygo.transformers.transformer import createTransformer
207+
208+
# Sample data: customer orders
209+
orders = [
210+
{"id": 1, "customer": "Alice", "amount": 150, "product": "laptop"},
211+
{"id": 2, "customer": "Bob", "amount": 25, "product": "book"},
212+
{"id": 3, "customer": "Charlie", "amount": 75, "product": "headphones"},
213+
{"id": 4, "customer": "Diana", "amount": 200, "product": "monitor"},
214+
{"id": 5, "customer": "Eve", "amount": 30, "product": "mouse"},
215+
]
216+
217+
# Create different analysis branches
218+
high_value_analysis = (
219+
createTransformer(dict)
220+
.filter(lambda order: order["amount"] > 100)
221+
.map(lambda order: {
222+
"customer": order["customer"],
223+
"amount": order["amount"],
224+
"category": "high_value"
225+
})
226+
)
227+
228+
product_summary = (
229+
createTransformer(dict)
230+
.map(lambda order: {"product": order["product"], "count": 1})
231+
# Group by product and sum counts (simplified example)
232+
)
233+
234+
customer_spending = (
235+
createTransformer(dict)
236+
.map(lambda order: {
237+
"customer": order["customer"],
238+
"total_spent": order["amount"]
239+
})
240+
)
241+
242+
# Branch the pipeline into multiple concurrent analyses
243+
results = Pipeline(orders).branch({
244+
"high_value_orders": high_value_analysis,
245+
"products": product_summary,
246+
"customer_totals": customer_spending
247+
})
248+
249+
print("High value orders:", results["high_value_orders"])
250+
# [{'customer': 'Alice', 'amount': 150, 'category': 'high_value'},
251+
# {'customer': 'Diana', 'amount': 200, 'category': 'high_value'}]
252+
253+
print("Product analysis:", len(results["products"]))
254+
# 5 (all products processed)
255+
256+
print("Customer spending:", len(results["customer_totals"]))
257+
# 5 (all customers processed)
258+
```
259+
260+
### Advanced Branching with Error Isolation
261+
262+
```python
263+
from laygo import Pipeline
264+
from laygo.transformers.transformer import createTransformer
265+
266+
# Data with potential issues
267+
mixed_data = [1, 2, "invalid", 4, 5, None, 7, 8]
268+
269+
# Branch 1: Safe numeric processing
270+
safe_numbers = (
271+
createTransformer(int | str | None)
272+
.filter(lambda x: isinstance(x, int) and x is not None)
273+
.map(lambda x: x * 2)
274+
)
275+
276+
# Branch 2: String processing with error handling
277+
string_processing = (
278+
createTransformer(int | str | None)
279+
.filter(lambda x: isinstance(x, str))
280+
.map(lambda x: f"processed_{x}")
281+
.catch(lambda t: t.map(lambda x: "error_handled"))
282+
)
283+
284+
# Branch 3: Statistical analysis
285+
stats_analysis = (
286+
createTransformer(int | str | None)
287+
.filter(lambda x: isinstance(x, int) and x is not None)
288+
.map(lambda x: x) # Pass through for stats
289+
)
290+
291+
# Execute all branches concurrently
292+
results = Pipeline(mixed_data).branch({
293+
"numbers": safe_numbers,
294+
"strings": string_processing,
295+
"stats": stats_analysis
296+
}, batch_size=100)
297+
298+
print("Processed numbers:", results["numbers"]) # [2, 4, 8, 10, 14, 16]
299+
print("Processed strings:", results["strings"]) # ['processed_invalid']
300+
print("Stats data:", results["stats"]) # [1, 2, 4, 5, 7, 8]
301+
302+
# Each branch processes the complete dataset independently
303+
# Errors in one branch don't affect others
304+
```
305+
200306
### Error Handling and Recovery
201307

202308
```python

laygo/pipeline.py

Lines changed: 121 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,21 @@ class Pipeline[T]:
2727
A Pipeline provides a high-level interface for data processing by chaining
2828
transformers together. It automatically manages a multiprocessing-safe
2929
shared context that can be accessed by all transformers in the chain.
30+
31+
The Pipeline supports both streaming and batch processing patterns, with
32+
built-in support for buffering, branching (fan-out), and parallel processing.
33+
34+
Example:
35+
>>> data = [1, 2, 3, 4, 5]
36+
>>> result = (Pipeline(data)
37+
... .transform(lambda t: t.filter(lambda x: x % 2 == 0))
38+
... .transform(lambda t: t.map(lambda x: x * 2))
39+
... .to_list())
40+
>>> result # [4, 8]
41+
42+
Note:
43+
Most pipeline operations consume the internal iterator, making the
44+
pipeline effectively single-use unless the data source is re-initialized.
3045
"""
3146

3247
def __init__(self, *data: Iterable[T]) -> None:
@@ -64,14 +79,22 @@ def __del__(self) -> None:
6479
def context(self, ctx: PipelineContext) -> "Pipeline[T]":
6580
"""Update the pipeline context and store a reference to the original context.
6681
67-
When the pipeline finishes processing, the original context will be updated
68-
with the final pipeline context data.
82+
The provided context will be used during pipeline execution and any
83+
modifications made by transformers will be synchronized back to the
84+
original context when the pipeline finishes processing.
6985
7086
Args:
71-
ctx: The pipeline context to use for this pipeline execution.
87+
ctx: The pipeline context dictionary to use for this pipeline execution.
88+
This should be a mutable dictionary-like object that transformers
89+
can use to share state and communicate.
7290
7391
Returns:
7492
The pipeline instance for method chaining.
93+
94+
Note:
95+
Changes made to the context during pipeline execution will be
96+
automatically synchronized back to the original context object
97+
when the pipeline is destroyed or processing completes.
7598
"""
7699
# Store reference to the original context
77100
self._original_context_ref = ctx
@@ -96,13 +119,21 @@ def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "
96119
"""Apply a transformation using a lambda function.
97120
98121
Creates a Transformer under the hood and applies it to the pipeline.
99-
This is a shorthand method for simple transformations.
122+
This is a shorthand method for simple transformations that allows
123+
chaining transformer operations in a functional style.
100124
101125
Args:
102126
t: A callable that takes a transformer and returns a transformed transformer.
127+
Typically used with lambda expressions like:
128+
`lambda t: t.map(func).filter(predicate)`
103129
104130
Returns:
105-
A new Pipeline with the transformed data.
131+
A new Pipeline with the transformed data type.
132+
133+
Example:
134+
>>> pipeline = Pipeline([1, 2, 3, 4, 5])
135+
>>> result = pipeline.transform(lambda t: t.filter(lambda x: x % 2 == 0).map(lambda x: x * 2))
136+
>>> result.to_list() # [4, 8]
106137
"""
107138
# Create a new transformer and apply the transformation function
108139
transformer = t(Transformer[T, T]())
@@ -125,17 +156,28 @@ def apply[U](
125156
) -> "Pipeline[U]":
126157
"""Apply a transformer to the current data source.
127158
128-
The pipeline's managed context is passed down to the transformer.
159+
This method accepts various types of transformers and applies them to
160+
the pipeline data. The pipeline's managed context is automatically
161+
passed to context-aware transformers.
129162
130163
Args:
131-
transformer: Either a Transformer instance or a callable function
132-
that processes the data.
164+
transformer: One of the following:
165+
- A Transformer instance (preferred for complex operations)
166+
- A callable function that takes an iterable and returns an iterator
167+
- A context-aware callable that takes an iterable and context
133168
134169
Returns:
135-
A new Pipeline with the transformed data.
170+
The same Pipeline instance with transformed data (for method chaining).
136171
137172
Raises:
138173
TypeError: If the transformer is not a supported type.
174+
175+
Example:
176+
>>> pipeline = Pipeline([1, 2, 3])
177+
>>> # Using a Transformer instance
178+
>>> pipeline.apply(createTransformer(int).map(lambda x: x * 2))
179+
>>> # Using a simple function
180+
>>> pipeline.apply(lambda data: (x * 2 for x in data))
139181
"""
140182
match transformer:
141183
case Transformer():
@@ -157,7 +199,34 @@ def branch(
157199
max_batch_buffer: int = 1,
158200
use_queue_chunks: bool = True,
159201
) -> dict[str, list[Any]]:
160-
"""Forks the pipeline into multiple branches for concurrent, parallel processing."""
202+
"""Forks the pipeline into multiple branches for concurrent, parallel processing.
203+
204+
This is a **terminal operation** that implements a fan-out pattern where
205+
the entire dataset is copied to each branch for independent processing.
206+
Each branch processes the complete dataset concurrently using separate
207+
transformers, and results are collected and returned in a dictionary.
208+
209+
Args:
210+
branches: A dictionary where keys are branch names (str) and values
211+
are `Transformer` instances of any subtype.
212+
batch_size: The number of items to batch together when sending data
213+
to branches. Larger batches can improve throughput but
214+
use more memory. Defaults to 1000.
215+
max_batch_buffer: The maximum number of batches to buffer for each
216+
branch queue. Controls memory usage and creates
217+
backpressure. Defaults to 1.
218+
use_queue_chunks: Whether to use passthrough chunking for the
219+
transformers. When True, batches are processed
220+
as chunks. Defaults to True.
221+
222+
Returns:
223+
A dictionary where keys are the branch names and values are lists
224+
of all items processed by that branch's transformer.
225+
226+
Note:
227+
This operation consumes the pipeline's iterator, making subsequent
228+
operations on the same pipeline return empty results.
229+
"""
161230
if not branches:
162231
self.consume()
163232
return {}
@@ -258,48 +327,84 @@ def _producer() -> None:
258327
def __iter__(self) -> Iterator[T]:
259328
"""Allow the pipeline to be iterated over.
260329
330+
This makes the Pipeline compatible with Python's iterator protocol,
331+
allowing it to be used in for loops, list comprehensions, and other
332+
contexts that expect an iterable.
333+
261334
Returns:
262335
An iterator over the processed data.
336+
337+
Note:
338+
This operation consumes the pipeline's iterator, making subsequent
339+
operations on the same pipeline return empty results.
263340
"""
264341
yield from self.processed_data
265342

266343
def to_list(self) -> list[T]:
267344
"""Execute the pipeline and return the results as a list.
268345
346+
This is a terminal operation that consumes the pipeline's iterator
347+
and materializes all results into memory.
348+
269349
Returns:
270350
A list containing all processed items from the pipeline.
351+
352+
Note:
353+
This operation consumes the pipeline's iterator, making subsequent
354+
operations on the same pipeline return empty results.
271355
"""
272356
return list(self.processed_data)
273357

274358
def each(self, function: PipelineFunction[T]) -> None:
275359
"""Apply a function to each element (terminal operation).
276360
361+
This is a terminal operation that processes each element for side effects
362+
and consumes the pipeline's iterator without returning results.
363+
277364
Args:
278-
function: The function to apply to each element.
365+
function: The function to apply to each element. Should be used for
366+
side effects like logging, updating external state, etc.
367+
368+
Note:
369+
This operation consumes the pipeline's iterator, making subsequent
370+
operations on the same pipeline return empty results.
279371
"""
280372
for item in self.processed_data:
281373
function(item)
282374

283375
def first(self, n: int = 1) -> list[T]:
284376
"""Get the first n elements of the pipeline (terminal operation).
285377
378+
This is a terminal operation that consumes up to n elements from the
379+
pipeline's iterator and returns them as a list.
380+
286381
Args:
287-
n: The number of elements to retrieve.
382+
n: The number of elements to retrieve. Must be at least 1.
288383
289384
Returns:
290-
A list containing the first n elements.
385+
A list containing the first n elements, or fewer if the pipeline
386+
contains fewer than n elements.
291387
292388
Raises:
293389
AssertionError: If n is less than 1.
390+
391+
Note:
392+
This operation partially consumes the pipeline's iterator. Subsequent
393+
operations will continue from where this operation left off.
294394
"""
295395
assert n >= 1, "n must be at least 1"
296396
return list(itertools.islice(self.processed_data, n))
297397

298398
def consume(self) -> None:
299-
"""Consume the pipeline without returning results.
399+
"""Consume the pipeline without returning results (terminal operation).
400+
401+
This is a terminal operation that processes all elements in the pipeline
402+
for their side effects without materializing any results. Useful when
403+
the pipeline operations have side effects and you don't need the results.
300404
301-
This is useful when you want to execute the pipeline for side effects
302-
without collecting the results.
405+
Note:
406+
This operation consumes the pipeline's iterator, making subsequent
407+
operations on the same pipeline return empty results.
303408
"""
304409
for _ in self.processed_data:
305410
pass

0 commit comments

Comments
 (0)