-
Notifications
You must be signed in to change notification settings - Fork 6.3k
/
Copy path01_getting_started_data_pipeline.yaml
55 lines (47 loc) · 1.38 KB
/
01_getting_started_data_pipeline.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
id: 01_getting_started_data_pipeline
namespace: zoomcamp
inputs:
- id: columns_to_keep
type: ARRAY
itemType: STRING
defaults:
- brand
- price
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://dummyjson.com/products
- id: transform
type: io.kestra.plugin.scripts.python.Script
containerImage: python:3.11-alpine
inputFiles:
data.json: "{{outputs.extract.uri}}"
outputFiles:
- "*.json"
env:
COLUMNS_TO_KEEP: "{{inputs.columns_to_keep}}"
script: |
import json
import os
columns_to_keep_str = os.getenv("COLUMNS_TO_KEEP")
columns_to_keep = json.loads(columns_to_keep_str)
with open("data.json", "r") as file:
data = json.load(file)
filtered_data = [
{column: product.get(column, "N/A") for column in columns_to_keep}
for product in data["products"]
]
with open("products.json", "w") as file:
json.dump(filtered_data, file, indent=4)
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
inputFiles:
products.json: "{{outputs.transform.outputFiles['products.json']}}"
sql: |
INSTALL json;
LOAD json;
SELECT brand, round(avg(price), 2) as avg_price
FROM read_json_auto('{{workingDir}}/products.json')
GROUP BY brand
ORDER BY avg_price DESC;
fetchType: STORE