-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask_flow_api_dag.py
127 lines (94 loc) · 2.71 KB
/
task_flow_api_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
This DAG shows how to use common Airflow decorators.
- PythonOperator -> @task
- BashOperator -> @task.bash
- BranchPythonOperator -> @task.branch
- PythonSensor -> @task.sensor
See also:
- Learn guide: https://www.astronomer.io/docs/learn/airflow-decorators
"""
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.sensors.python import PythonSensor
from airflow.operators.empty import EmptyOperator
from airflow.models.baseoperator import chain
@dag(
start_date=None,
schedule=None,
catchup=False,
tags=["syntax_example"],
default_args={"retries": 3},
)
def task_flow_api_dag():
# PythonOperator and @task
def _say_hi():
print("Hi!")
PythonOperator(
task_id="python_hi_traditional",
python_callable=_say_hi,
)
@task
def python_hi_taskflow():
print("Hi!")
python_hi_taskflow()
# BashOperator and @task.bash
BashOperator(
task_id="bash_hello_traditional",
bash_command="echo Hello!",
)
@task.bash
def bash_hello_taskflow():
return "echo Hello!"
bash_hello_taskflow()
# BranchPythonOperator and @task.branch
def _branching():
import random
if random.choice([True, False]):
return "branch_a_traditional"
else:
return "branch_b_traditional"
b1 = BranchPythonOperator(
task_id="branching_traditional",
python_callable=_branching,
)
e1 = EmptyOperator(task_id="branch_a_traditional")
e2 = EmptyOperator(task_id="branch_b_traditional")
b1 >> [e1, e2]
@task.branch
def branching_taskflow():
import random
if random.choice([True, False]):
return "branch_a_taskflow"
else:
return "branch_b_taskflow"
e3 = EmptyOperator(task_id="branch_a_taskflow")
e4 = EmptyOperator(task_id="branch_b_taskflow")
chain(branching_taskflow(), [e3, e4])
# PythonSensor and @task.sensor
def _wait_for_it():
import random
import time
time.sleep(10)
if random.choice([True, False]):
return True
else:
return False
s1 = PythonSensor(
task_id="sensor_traditional",
poke_interval=10,
timeout=3600,
mode="poke",
python_callable=_wait_for_it,
)
@task.sensor(poke_interval=10, timeout=3600, mode="poke")
def sensor_taskflow():
import random
import time
time.sleep(10)
if random.choice([True, False]):
return True
else:
return False
sensor_taskflow()
task_flow_api_dag()