Skip to content

Commit 670babc

Browse files
committed
chore: updated readme
1 parent 423e8d2 commit 670babc

File tree

2 files changed

+214
-2
lines changed

2 files changed

+214
-2
lines changed

README.md

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
**Laygo** is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API.
2121

22-
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.
22+
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead. Process data in parallel, branch into multiple analysis paths, and handle errors gracefully - all with the same clean, chainable syntax.
2323

2424
**Key Features:**
2525

@@ -31,6 +31,8 @@ It's built to grow with you. Scale seamlessly from a single local script to thou
3131

3232
- **Effortless Parallelism**: Accelerate CPU-intensive tasks seamlessly.
3333

34+
- **Fan-out Processing**: Split pipelines into multiple concurrent branches for parallel analysis of the same dataset.
35+
3436
- **Distributed by Design**: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one.
3537

3638
- **Powerful Context Management**: Share state and configuration across your entire pipeline for advanced, stateful processing.
@@ -197,6 +199,110 @@ results = (
197199
)
198200
```
199201

202+
### Pipeline Branching (Fan-out Processing)
203+
204+
```python
205+
from laygo import Pipeline
206+
from laygo.transformers.transformer import createTransformer
207+
208+
# Sample data: customer orders
209+
orders = [
210+
{"id": 1, "customer": "Alice", "amount": 150, "product": "laptop"},
211+
{"id": 2, "customer": "Bob", "amount": 25, "product": "book"},
212+
{"id": 3, "customer": "Charlie", "amount": 75, "product": "headphones"},
213+
{"id": 4, "customer": "Diana", "amount": 200, "product": "monitor"},
214+
{"id": 5, "customer": "Eve", "amount": 30, "product": "mouse"},
215+
]
216+
217+
# Create different analysis branches
218+
high_value_analysis = (
219+
createTransformer(dict)
220+
.filter(lambda order: order["amount"] > 100)
221+
.map(lambda order: {
222+
"customer": order["customer"],
223+
"amount": order["amount"],
224+
"category": "high_value"
225+
})
226+
)
227+
228+
product_summary = (
229+
createTransformer(dict)
230+
.map(lambda order: {"product": order["product"], "count": 1})
231+
# Group by product and sum counts (simplified example)
232+
)
233+
234+
customer_spending = (
235+
createTransformer(dict)
236+
.map(lambda order: {
237+
"customer": order["customer"],
238+
"total_spent": order["amount"]
239+
})
240+
)
241+
242+
# Branch the pipeline into multiple concurrent analyses
243+
results = Pipeline(orders).branch({
244+
"high_value_orders": high_value_analysis,
245+
"products": product_summary,
246+
"customer_totals": customer_spending
247+
})
248+
249+
print("High value orders:", results["high_value_orders"])
250+
# [{'customer': 'Alice', 'amount': 150, 'category': 'high_value'},
251+
# {'customer': 'Diana', 'amount': 200, 'category': 'high_value'}]
252+
253+
print("Product analysis:", len(results["products"]))
254+
# 5 (all products processed)
255+
256+
print("Customer spending:", len(results["customer_totals"]))
257+
# 5 (all customers processed)
258+
```
259+
260+
### Advanced Branching with Error Isolation
261+
262+
```python
263+
from laygo import Pipeline
264+
from laygo.transformers.transformer import createTransformer
265+
266+
# Data with potential issues
267+
mixed_data = [1, 2, "invalid", 4, 5, None, 7, 8]
268+
269+
# Branch 1: Safe numeric processing
270+
safe_numbers = (
271+
createTransformer(int | str | None)
272+
.filter(lambda x: isinstance(x, int) and x is not None)
273+
.map(lambda x: x * 2)
274+
)
275+
276+
# Branch 2: String processing with error handling
277+
string_processing = (
278+
createTransformer(int | str | None)
279+
.filter(lambda x: isinstance(x, str))
280+
.map(lambda x: f"processed_{x}")
281+
.catch(lambda t: t.map(lambda x: "error_handled"))
282+
)
283+
284+
# Branch 3: Statistical analysis
285+
stats_analysis = (
286+
createTransformer(int | str | None)
287+
.filter(lambda x: isinstance(x, int) and x is not None)
288+
.map(lambda x: x) # Pass through for stats
289+
)
290+
291+
# Execute all branches concurrently
292+
results = Pipeline(mixed_data).branch({
293+
"numbers": safe_numbers,
294+
"strings": string_processing,
295+
"stats": stats_analysis
296+
}, batch_size=100)
297+
298+
print("Processed numbers:", results["numbers"]) # [2, 4, 8, 10, 14, 16]
299+
print("Processed strings:", results["strings"]) # ['processed_invalid']
300+
print("Stats data:", results["stats"]) # [1, 2, 4, 5, 7, 8]
301+
302+
# Each branch processes the complete dataset independently
303+
# Errors in one branch don't affect others
304+
```
305+
200306
### Error Handling and Recovery
201307

202308
```python

wiki/Home.md

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
**Laygo** is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API.
2121

22-
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.
22+
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead. Process data in parallel, branch into multiple analysis paths, and handle errors gracefully - all with the same clean, chainable syntax.
2323

2424
**Key Features:**
2525

@@ -31,6 +31,8 @@ It's built to grow with you. Scale seamlessly from a single local script to thou
3131

3232
- **Effortless Parallelism**: Accelerate CPU-intensive tasks seamlessly.
3333

34+
- **Fan-out Processing**: Split pipelines into multiple concurrent branches for parallel analysis of the same dataset.
35+
3436
- **Distributed by Design**: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one.
3537

3638
- **Powerful Context Management**: Share state and configuration across your entire pipeline for advanced, stateful processing.
@@ -197,6 +199,110 @@ results = (
197199
)
198200
```
199201

202+
### Pipeline Branching (Fan-out Processing)
203+
204+
```python
205+
from laygo import Pipeline
206+
from laygo.transformers.transformer import createTransformer
207+
208+
# Sample data: customer orders
209+
orders = [
210+
{"id": 1, "customer": "Alice", "amount": 150, "product": "laptop"},
211+
{"id": 2, "customer": "Bob", "amount": 25, "product": "book"},
212+
{"id": 3, "customer": "Charlie", "amount": 75, "product": "headphones"},
213+
{"id": 4, "customer": "Diana", "amount": 200, "product": "monitor"},
214+
{"id": 5, "customer": "Eve", "amount": 30, "product": "mouse"},
215+
]
216+
217+
# Create different analysis branches
218+
high_value_analysis = (
219+
createTransformer(dict)
220+
.filter(lambda order: order["amount"] > 100)
221+
.map(lambda order: {
222+
"customer": order["customer"],
223+
"amount": order["amount"],
224+
"category": "high_value"
225+
})
226+
)
227+
228+
product_summary = (
229+
createTransformer(dict)
230+
.map(lambda order: {"product": order["product"], "count": 1})
231+
# Group by product and sum counts (simplified example)
232+
)
233+
234+
customer_spending = (
235+
createTransformer(dict)
236+
.map(lambda order: {
237+
"customer": order["customer"],
238+
"total_spent": order["amount"]
239+
})
240+
)
241+
242+
# Branch the pipeline into multiple concurrent analyses
243+
results = Pipeline(orders).branch({
244+
"high_value_orders": high_value_analysis,
245+
"products": product_summary,
246+
"customer_totals": customer_spending
247+
})
248+
249+
print("High value orders:", results["high_value_orders"])
250+
# [{'customer': 'Alice', 'amount': 150, 'category': 'high_value'},
251+
# {'customer': 'Diana', 'amount': 200, 'category': 'high_value'}]
252+
253+
print("Product analysis:", len(results["products"]))
254+
# 5 (all products processed)
255+
256+
print("Customer spending:", len(results["customer_totals"]))
257+
# 5 (all customers processed)
258+
```
259+
260+
### Advanced Branching with Error Isolation
261+
262+
```python
263+
from laygo import Pipeline
264+
from laygo.transformers.transformer import createTransformer
265+
266+
# Data with potential issues
267+
mixed_data = [1, 2, "invalid", 4, 5, None, 7, 8]
268+
269+
# Branch 1: Safe numeric processing
270+
safe_numbers = (
271+
createTransformer(int | str | None)
272+
.filter(lambda x: isinstance(x, int) and x is not None)
273+
.map(lambda x: x * 2)
274+
)
275+
276+
# Branch 2: String processing with error handling
277+
string_processing = (
278+
createTransformer(int | str | None)
279+
.filter(lambda x: isinstance(x, str))
280+
.map(lambda x: f"processed_{x}")
281+
.catch(lambda t: t.map(lambda x: "error_handled"))
282+
)
283+
284+
# Branch 3: Statistical analysis
285+
stats_analysis = (
286+
createTransformer(int | str | None)
287+
.filter(lambda x: isinstance(x, int) and x is not None)
288+
.map(lambda x: x) # Pass through for stats
289+
)
290+
291+
# Execute all branches concurrently
292+
results = Pipeline(mixed_data).branch({
293+
"numbers": safe_numbers,
294+
"strings": string_processing,
295+
"stats": stats_analysis
296+
}, batch_size=100)
297+
298+
print("Processed numbers:", results["numbers"]) # [2, 4, 8, 10, 14, 16]
299+
print("Processed strings:", results["strings"]) # ['processed_invalid']
300+
print("Stats data:", results["stats"]) # [1, 2, 4, 5, 7, 8]
301+
302+
# Each branch processes the complete dataset independently
303+
# Errors in one branch don't affect others
304+
```
305+
200306
### Error Handling and Recovery
201307

202308
```python

0 commit comments

Comments
 (0)