-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathalerts_dag.py
87 lines (72 loc) · 2.81 KB
/
alerts_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
This DAG shows DAG-level and task-level alerts.
"""
from airflow.decorators import dag, task
from pendulum import duration
from airflow.notifications.basenotifier import BaseNotifier
class MyNotifier(BaseNotifier):
"""
Basic notifier, says hi.
"""
def __init__(self):
pass
def notify(self, context):
t_id = context["ti"].task_id
t_state = context["ti"].state
print(f"Hi from MyNotifier! {t_id} finished as: {t_state}")
def my_callback_function(context):
t_id = context["ti"].task_id
t_state = context["ti"].state
print(f"Hi from my_callback_function! {t_id} finished as: {t_state}")
@dag(
start_date=None,
schedule=None,
catchup=False,
# DAG level callbacks depend on events happening to the DAG itself
on_success_callback=[my_callback_function, MyNotifier()],
on_failure_callback=[my_callback_function, MyNotifier()],
sla_miss_callback=[my_callback_function, MyNotifier()],
# callbacks provided in the default_args are giving to all tasks in the DAG
default_args={
"on_execute_callback": [my_callback_function, MyNotifier()],
"on_retry_callback": [my_callback_function, MyNotifier()],
"on_success_callback": [my_callback_function, MyNotifier()],
"on_failure_callback": [my_callback_function, MyNotifier()],
"on_skipped_callback": [my_callback_function, MyNotifier()], # Airflow 2.9+
"retries": 3,
"retry_delay": duration(seconds=5)
},
tags=["syntax_example"],
)
def callbacks_overview():
@task(
# you can override default_args on the task level
on_execute_callback=[my_callback_function, MyNotifier()],
on_retry_callback=[my_callback_function, MyNotifier()],
on_success_callback=[my_callback_function, MyNotifier()],
on_failure_callback=[my_callback_function, MyNotifier()],
on_skipped_callback=[my_callback_function, MyNotifier()], # Airflow 2.9+, only responds to AirflowSkipException
)
def task_succeeding_task_level_callback():
return 10
@task(
# you can override default_args on the task level
on_execute_callback=[my_callback_function, MyNotifier()],
on_retry_callback=[my_callback_function, MyNotifier()],
on_success_callback=[my_callback_function, MyNotifier()],
on_failure_callback=[my_callback_function, MyNotifier()],
on_skipped_callback=[my_callback_function, MyNotifier()], # Airflow 2.9+, only responds to AiflowSkipException
)
def task_failing_task_level_callback():
return 10 / 0
@task
def task_succeeding():
return 10
@task
def task_failing():
return 10 / 0
task_succeeding()
task_failing()
task_succeeding_task_level_callback()
task_failing_task_level_callback()
callbacks_overview()