-
Hi everyone, I have a DAG that is used for multiple customers that subscribed to an app. The dag has multiple tasks of various operators. When a customer deletes the app, corresponding resources are removed from the database, and it's expected that the tasks will fail for that customer. What I want to do is to check the activity status of a customer on the failure of any task and skip the retry if the customer is not subscribed anymore. I know that AirflowFailException can be used to skip retries on failure; however, I want this behavior to be generic so that it applies on failures/retries of all tasks and operators (not only the python operator) I've created a function as below and passed it as def dag_retry(context):
print("Running dag retry callback.")
# Do not retry if a customer is not active.
customer = context["params"]["customer"]
if not check_if_customer_active(customer):
raise AirflowFailException(
f"Customer {customer} is not active anymore! "
f"No more retry is needed."
) When a task of an unsubscribed customer fails for the first time, this function is called, and Do you have any idea how to solve this? Thanks in advance. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
You could try to set status of Task Instance to from airflow.utils.state import TaskInstanceState
def retry_callback(context) -> None:
if True: # some condition
ti = context["ti"]
print(f"Set task instance {ti} state to {TaskInstanceState.FAILED}")
ti.set_state(TaskInstanceState.FAILED)
return |
Beta Was this translation helpful? Give feedback.
You could try to set status of Task Instance to
failed