-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathprefect2_test_flow.py
More file actions
41 lines (31 loc) · 1022 Bytes
/
prefect2_test_flow.py
File metadata and controls
41 lines (31 loc) · 1022 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from prefect import flow, task, get_run_logger
from prefect.blocks.notifications import SlackWebhook
from prefect.context import FlowRunContext
@task
def print_task():
print("Hello world!")
logger = get_run_logger()
logger.info("Hello world!")
def slack(func):
def wrapper(*args, **kwargs):
flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
slack_webhook = SlackWebhook.load("mon-prefect")
try:
result = func(*args, **kwargs)
slack_webhook.notify(
f":white_check_mark: Flow-run successful. (*{flow_run_name}*)"
)
return result
except Exception:
slack_webhook.notify(f":bangbang: Flow-run failed. (*{flow_run_name}*)")
raise
return wrapper
@flow
@slack
def hello_world():
logger = get_run_logger()
logger.info("Starting flow")
print_task()
# test_dict["key"] # Trying to create a key error here.
if __name__ == "__main__":
hello_world()