diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index e4a5069b29bc..8716d9c9cc49 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -101,7 +101,6 @@ def initialize_method_map() -> dict[str, Callable]: DagFileProcessor._execute_task_callbacks, DagFileProcessor.execute_callbacks, DagFileProcessor.execute_callbacks_without_dag, - DagFileProcessor.manage_slas, DagFileProcessor.save_dag_to_db, DagFileProcessor.update_import_errors, DagFileProcessor._validate_task_pools_and_update_dag_warnings, diff --git a/airflow/callbacks/callback_requests.py b/airflow/callbacks/callback_requests.py index 7158c45d44d9..07ad648e9630 100644 --- a/airflow/callbacks/callback_requests.py +++ b/airflow/callbacks/callback_requests.py @@ -137,23 +137,3 @@ def __init__( self.dag_id = dag_id self.run_id = run_id self.is_failure_callback = is_failure_callback - - -class SlaCallbackRequest(CallbackRequest): - """ - A class with information about the SLA callback to be executed. - - :param full_filepath: File Path to use to run the callback - :param dag_id: DAG ID - :param processor_subdir: Directory used by Dag Processor when parsed the dag. - """ - - def __init__( - self, - full_filepath: str, - dag_id: str, - processor_subdir: str | None, - msg: str | None = None, - ): - super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg) - self.dag_id = dag_id diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 3bef18058dfb..c9abee3c8506 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -395,13 +395,6 @@ core: type: integer example: ~ default: "30" - check_slas: - description: | - On each dagrun check against defined SLAs - version_added: 1.10.8 - type: string - example: ~ - default: "True" xcom_backend: description: | Path to custom XCom class that will be used to store and resolve operators results diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 6df8060f3a31..05fb72daee60 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -42,7 +42,7 @@ import airflow.models from airflow.api_internal.internal_api_call import internal_api_call -from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest +from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.dag_processing.processor import DagFileProcessorProcess from airflow.models.dag import DagModel @@ -752,40 +752,17 @@ def _fetch_callbacks_with_retries( return callback_queue def _add_callback_to_queue(self, request: CallbackRequest): - # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled - # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives, - # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to - # the front of the queue, and we never get round to picking stuff off the back of the queue - if isinstance(request, SlaCallbackRequest): - if request in self._callback_to_execute[request.full_filepath]: - self.log.debug("Skipping already queued SlaCallbackRequest") - return - - # not already queued, queue the callback - # do NOT add the file of this SLA to self._file_path_queue. SLAs can arrive so rapidly that - # they keep adding to the file queue and never letting it drain. This in turn prevents us from - # ever rescanning the dags folder for changes to existing dags. We simply store the callback, and - # periodically, when self._file_path_queue is drained, we rescan and re-queue all DAG files. - # The SLAs will be picked up then. It means a delay in reacting to the SLAs (as controlled by the - # min_file_process_interval config) but stops SLAs from DoS'ing the queue. - self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id) - self._callback_to_execute[request.full_filepath].append(request) - Stats.incr("dag_processing.sla_callback_count") - - # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if - # already in the file path queue - else: - self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request) - self._callback_to_execute[request.full_filepath].append(request) - if request.full_filepath in self._file_path_queue: - # Remove file paths matching request.full_filepath from self._file_path_queue - # Since we are already going to use that filepath to run callback, - # there is no need to have same file path again in the queue - self._file_path_queue = deque( - file_path for file_path in self._file_path_queue if file_path != request.full_filepath - ) - self._add_paths_to_queue([request.full_filepath], True) - Stats.incr("dag_processing.other_callback_count") + self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request) + self._callback_to_execute[request.full_filepath].append(request) + if request.full_filepath in self._file_path_queue: + # Remove file paths matching request.full_filepath from self._file_path_queue + # Since we are already going to use that filepath to run callback, + # there is no need to have same file path again in the queue + self._file_path_queue = deque( + file_path for file_path in self._file_path_queue if file_path != request.full_filepath + ) + self._add_paths_to_queue([request.full_filepath], True) + Stats.incr("dag_processing.other_callback_count") def _refresh_requested_filelocs(self) -> None: """Refresh filepaths from dag dir as requested by users via APIs.""" diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 0b19d8f2db76..f030cb75019e 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -25,33 +25,28 @@ import zipfile from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress from dataclasses import dataclass -from datetime import timedelta -from typing import TYPE_CHECKING, Generator, Iterable, Iterator +from typing import TYPE_CHECKING, Generator, Iterable from setproctitle import setproctitle -from sqlalchemy import delete, event, func, or_, select +from sqlalchemy import delete, event from airflow import settings -from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call +from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import ( DagCallbackRequest, - SlaCallbackRequest, TaskCallbackRequest, ) from airflow.configuration import conf -from airflow.exceptions import AirflowException, TaskNotFound +from airflow.exceptions import AirflowException from airflow.listeners.listener import get_listener_manager -from airflow.models import SlaMiss from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag -from airflow.models.dagrun import DagRun as DR from airflow.models.dagwarning import DagWarning, DagWarningType from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, _run_finished_callback +from airflow.models.taskinstance import TaskInstance, _run_finished_callback from airflow.stats import Stats from airflow.utils import timezone -from airflow.utils.email import get_email_address_list, send_email from airflow.utils.file import iter_airflow_imports, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context from airflow.utils.mixins import MultiprocessingStartMethodMixin @@ -440,180 +435,6 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L self.dag_warnings: set[tuple[str, str]] = set() self._last_num_of_db_queries = 0 - @classmethod - @internal_api_call - @provide_session - def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> None: - """ - Find all tasks that have SLAs defined, and send alert emails when needed. - - New SLA misses are also recorded in the database. - - We are assuming that the scheduler runs often, so we only check for - tasks that should have succeeded in the past hour. - """ - dagbag = DagFileProcessor._get_dagbag(dag_folder) - dag = dagbag.get_dag(dag_id) - cls.logger().info("Running SLA Checks for %s", dag.dag_id) - if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks): - cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) - return - qry = ( - select(TI.task_id, func.max(DR.execution_date).label("max_ti")) - .join(TI.dag_run) - .where(TI.dag_id == dag.dag_id) - .where(or_(TI.state == TaskInstanceState.SUCCESS, TI.state == TaskInstanceState.SKIPPED)) - .where(TI.task_id.in_(dag.task_ids)) - .group_by(TI.task_id) - .subquery("sq") - ) - # get recorded SlaMiss - recorded_slas_query = set( - session.execute( - select(SlaMiss.dag_id, SlaMiss.task_id, SlaMiss.execution_date).where( - SlaMiss.dag_id == dag.dag_id, SlaMiss.task_id.in_(dag.task_ids) - ) - ) - ) - max_tis: Iterator[TI] = session.scalars( - select(TI) - .join(TI.dag_run) - .where(TI.dag_id == dag.dag_id, TI.task_id == qry.c.task_id, DR.execution_date == qry.c.max_ti) - ) - - ts = timezone.utcnow() - - for ti in max_tis: - task = dag.get_task(ti.task_id) - if not task.sla: - continue - - if not isinstance(task.sla, timedelta): - raise TypeError( - f"SLA is expected to be timedelta object, got " - f"{type(task.sla)} in {task.dag_id}:{task.task_id}" - ) - - sla_misses = [] - next_info = dag.next_dagrun_info(dag.get_run_data_interval(ti.dag_run), restricted=False) - while next_info and next_info.logical_date < ts: - next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False) - - if next_info is None: - break - if (ti.dag_id, ti.task_id, next_info.logical_date) in recorded_slas_query: - continue - if next_info.logical_date + task.sla < ts: - sla_miss = SlaMiss( - task_id=ti.task_id, - dag_id=ti.dag_id, - execution_date=next_info.logical_date, - timestamp=ts, - ) - sla_misses.append(sla_miss) - Stats.incr("sla_missed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id}) - if sla_misses: - session.add_all(sla_misses) - session.commit() - slas: list[SlaMiss] = session.scalars( - select(SlaMiss).where(~SlaMiss.notification_sent, SlaMiss.dag_id == dag.dag_id) - ).all() - if slas: - sla_dates: list[datetime] = [sla.execution_date for sla in slas] - fetched_tis: list[TI] = session.scalars( - select(TI).where( - TI.dag_id == dag.dag_id, - TI.execution_date.in_(sla_dates), - TI.state != TaskInstanceState.SUCCESS, - ) - ).all() - blocking_tis: list[TI] = [] - for ti in fetched_tis: - if ti.task_id in dag.task_ids: - ti.task = dag.get_task(ti.task_id) - blocking_tis.append(ti) - else: - session.delete(ti) - session.commit() - - task_list = "\n".join(sla.task_id + " on " + sla.execution_date.isoformat() for sla in slas) - blocking_task_list = "\n".join( - ti.task_id + " on " + ti.execution_date.isoformat() for ti in blocking_tis - ) - # Track whether email or any alert notification sent - # We consider email or the alert callback as notifications - email_sent = False - notification_sent = False - if dag.sla_miss_callback: - # Execute the alert callback - callbacks = ( - dag.sla_miss_callback - if isinstance(dag.sla_miss_callback, list) - else [dag.sla_miss_callback] - ) - for callback in callbacks: - cls.logger().info("Calling SLA miss callback %s", callback) - try: - callback(dag, task_list, blocking_task_list, slas, blocking_tis) - notification_sent = True - except Exception: - Stats.incr( - "sla_callback_notification_failure", - tags={ - "dag_id": dag.dag_id, - "func_name": callback.__name__, - }, - ) - cls.logger().exception( - "Could not call sla_miss_callback(%s) for DAG %s", - callback.__name__, - dag.dag_id, - ) - email_content = f"""\ - Here's a list of tasks that missed their SLAs: -
{task_list}\n
- Blocking tasks: -
{blocking_task_list}
- Airflow Webserver URL: {conf.get(section='webserver', key='base_url')} - """ - - tasks_missed_sla = [] - for sla in slas: - try: - task = dag.get_task(sla.task_id) - except TaskNotFound: - # task already deleted from DAG, skip it - cls.logger().warning( - "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id - ) - else: - tasks_missed_sla.append(task) - - emails: set[str] = set() - for task in tasks_missed_sla: - if task.email: - if isinstance(task.email, str): - emails.update(get_email_address_list(task.email)) - elif isinstance(task.email, (list, tuple)): - emails.update(task.email) - if emails: - try: - send_email(emails, f"[airflow] SLA miss on DAG={dag.dag_id}", email_content) - email_sent = True - notification_sent = True - except Exception: - Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id}) - cls.logger().exception( - "Could not send SLA Miss email notification for DAG %s", dag.dag_id - ) - # If we sent any notification, update the sla_miss table - if notification_sent: - for sla in slas: - sla.email_sent = email_sent - sla.notification_sent = True - session.merge(sla) - session.commit() - @staticmethod @internal_api_call @provide_session @@ -748,13 +569,6 @@ def execute_callbacks( try: if isinstance(request, TaskCallbackRequest): cls._execute_task_callbacks(dagbag, request, unit_test_mode, session=session) - elif isinstance(request, SlaCallbackRequest): - if InternalApiConfig.get_use_internal_api(): - cls.logger().warning( - "SlaCallbacks are not supported when the Internal API is enabled" - ) - else: - DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session) elif isinstance(request, DagCallbackRequest): cls._execute_dag_callbacks(dagbag, request, session=session) except Exception: diff --git a/airflow/example_dags/example_sla_dag.py b/airflow/example_dags/example_sla_dag.py deleted file mode 100644 index aca1277e8879..000000000000 --- a/airflow/example_dags/example_sla_dag.py +++ /dev/null @@ -1,66 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Example DAG demonstrating SLA use in Tasks""" - -from __future__ import annotations - -import datetime -import time - -import pendulum - -from airflow.decorators import dag, task - - -# [START howto_task_sla] -def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis): - print( - "The callback arguments are: ", - { - "dag": dag, - "task_list": task_list, - "blocking_task_list": blocking_task_list, - "slas": slas, - "blocking_tis": blocking_tis, - }, - ) - - -@dag( - schedule="*/2 * * * *", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - sla_miss_callback=sla_callback, - default_args={"email": "email@example.com"}, -) -def example_sla_dag(): - @task(sla=datetime.timedelta(seconds=10)) - def sleep_20(): - """Sleep for 20 seconds""" - time.sleep(20) - - @task - def sleep_30(): - """Sleep for 30 seconds""" - time.sleep(30) - - sleep_20() >> sleep_30() - - -example_dag = example_sla_dag() - -# [END howto_task_sla] diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index a49c2361ec42..9438edd4d918 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -36,7 +36,7 @@ from sqlalchemy.sql import expression from airflow import settings -from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest +from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.callbacks.pipe_callback_sink import PipeCallbackSink from airflow.configuration import conf from airflow.exceptions import UnknownExecutorException @@ -1724,37 +1724,11 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> return True def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackRequest | None = None) -> None: - self._send_sla_callbacks_to_processor(dag) if callback: self.job.executor.send_callback(callback) else: self.log.debug("callback is empty") - def _send_sla_callbacks_to_processor(self, dag: DAG) -> None: - """Send SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True.""" - if not settings.CHECK_SLAS: - return - - if not any(isinstance(task.sla, timedelta) for task in dag.tasks): - self.log.debug("Skipping SLA check for %s because no tasks in DAG have SLAs", dag) - return - - if not dag.timetable.periodic: - self.log.debug("Skipping SLA check for %s because DAG is not scheduled", dag) - return - - dag_model = DagModel.get_dagmodel(dag.dag_id) - if not dag_model: - self.log.error("Couldn't find DAG %s in database!", dag.dag_id) - return - - request = SlaCallbackRequest( - full_filepath=dag.fileloc, - dag_id=dag.dag_id, - processor_subdir=dag_model.processor_subdir, - ) - self.job.executor.send_callback(request) - @provide_session def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 8f95d1eee730..20656586ba01 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -677,17 +677,7 @@ class derived from this one results in the creation of a task object, way to limit concurrency for certain tasks :param pool_slots: the number of pool slots this task should use (>= 1) Values less than 1 are not allowed. - :param sla: time by which the job is expected to succeed. Note that - this represents the ``timedelta`` after the period is closed. For - example if you set an SLA of 1 hour, the scheduler would send an email - soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance - has not succeeded yet. - The scheduler pays special attention for jobs with an SLA and - sends alert - emails for SLA misses. SLA misses are also recorded in the database - for future reference. All tasks that share the same SLA time - get bundled in a single email, sent soon after that time. SLA - notification are sent once and only once for each task instance. + :param sla: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1 :param execution_timeout: max time allowed for the execution of this task instance, if it goes beyond it will raise and fail. :param on_failure_callback: a function or list of functions to be called when a task instance @@ -975,7 +965,11 @@ def __init__( if self.pool_slots < 1: dag_str = f" in dag {dag.dag_id}" if dag else "" raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot be less than 1") - self.sla = sla + + if sla: + self.log.warning( + "The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1" + ) if not TriggerRule.is_valid(trigger_rule): raise AirflowException( diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 215dae298f10..91f8aec7302c 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -42,7 +42,6 @@ Container, Iterable, Iterator, - List, MutableSet, Pattern, Sequence, @@ -146,7 +145,6 @@ from airflow.decorators import TaskDecoratorCollection from airflow.models.dagbag import DagBag from airflow.models.operator import Operator - from airflow.models.slamiss import SlaMiss from airflow.serialization.pydantic.dag import DagModelPydantic from airflow.serialization.pydantic.dag_run import DagRunPydantic from airflow.typing_compat import Literal @@ -169,8 +167,6 @@ Collection[Union["Dataset", "DatasetAlias"]], ] -SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None] - class InconsistentDataInterval(AirflowException): """ @@ -430,10 +426,7 @@ class DAG(LoggingMixin): beyond this the scheduler will disable the DAG :param dagrun_timeout: Specify the duration a DagRun should be allowed to run before it times out or fails. Task instances that are running when a DagRun is timed out will be marked as skipped. - :param sla_miss_callback: specify a function or list of functions to call when reporting SLA - timeouts. See :ref:`sla_miss_callback` for - more information about the function signature and parameters that are - passed to the callback. + :param sla_miss_callback: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1 :param default_view: Specify DAG default view (grid, graph, duration, gantt, landing_times), default grid :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT), default LR @@ -519,7 +512,7 @@ def __init__( "core", "max_consecutive_failed_dag_runs_per_dag" ), dagrun_timeout: timedelta | None = None, - sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None, + sla_miss_callback: Any = None, default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(), orientation: str = airflow_conf.get_mandatory_value("webserver", "dag_orientation"), catchup: bool = airflow_conf.getboolean("scheduler", "catchup_by_default"), @@ -639,7 +632,10 @@ def __init__( f"requires max_active_runs <= {self.timetable.active_runs_limit}" ) self.dagrun_timeout = dagrun_timeout - self.sla_miss_callback = sla_miss_callback + if sla_miss_callback: + log.warning( + "The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1" + ) if default_view in DEFAULT_VIEW_PRESETS: self._default_view: str = default_view else: @@ -3297,7 +3293,7 @@ def dag( "core", "max_consecutive_failed_dag_runs_per_dag" ), dagrun_timeout: timedelta | None = None, - sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None, + sla_miss_callback: Any = None, default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(), orientation: str = airflow_conf.get_mandatory_value("webserver", "dag_orientation"), catchup: bool = airflow_conf.getboolean("scheduler", "catchup_by_default"), diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index 2cb7d993fc9f..8a9e790ea7fc 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -26,7 +26,7 @@ import attr import methodtools -from airflow.exceptions import AirflowException, UnmappableOperator +from airflow.exceptions import UnmappableOperator from airflow.models.abstractoperator import ( DEFAULT_EXECUTOR, DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, @@ -328,11 +328,6 @@ def __attrs_post_init__(self): for k, v in self.partial_kwargs.items(): if k in self.template_fields: XComArg.apply_upstream_relationship(self, v) - if self.partial_kwargs.get("sla") is not None: - raise AirflowException( - f"SLAs are unsupported with mapped tasks. Please set `sla=None` for task " - f"{self.task_id!r}." - ) @methodtools.lru_cache(maxsize=None) @classmethod @@ -547,14 +542,6 @@ def weight_rule(self) -> PriorityWeightStrategy: # type: ignore[override] def weight_rule(self, value: str | PriorityWeightStrategy) -> None: self.partial_kwargs["weight_rule"] = validate_and_load_priority_weight_strategy(value) - @property - def sla(self) -> datetime.timedelta | None: - return self.partial_kwargs.get("sla") - - @sla.setter - def sla(self, value: datetime.timedelta | None) -> None: - self.partial_kwargs["sla"] = value - @property def max_active_tis_per_dag(self) -> int | None: return self.partial_kwargs.get("max_active_tis_per_dag") diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py index f216ce731610..49a3de3d774c 100644 --- a/airflow/serialization/enums.py +++ b/airflow/serialization/enums.py @@ -71,6 +71,5 @@ class DagAttributeTypes(str, Enum): ARG_NOT_SET = "arg_not_set" TASK_CALLBACK_REQUEST = "task_callback_request" DAG_CALLBACK_REQUEST = "dag_callback_request" - SLA_CALLBACK_REQUEST = "sla_callback_request" TASK_INSTANCE_KEY = "task_instance_key" TRIGGER = "trigger" diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 12310685ec69..c9c1f1183527 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -34,7 +34,7 @@ from pendulum.tz.timezone import FixedTimezone, Timezone from airflow import macros -from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest +from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.compat.functools import cache from airflow.datasets import ( BaseDataset, @@ -758,8 +758,6 @@ def serialize( return cls._encode(var.to_json(), type_=DAT.TASK_CALLBACK_REQUEST) elif isinstance(var, DagCallbackRequest): return cls._encode(var.to_json(), type_=DAT.DAG_CALLBACK_REQUEST) - elif isinstance(var, SlaCallbackRequest): - return cls._encode(var.to_json(), type_=DAT.SLA_CALLBACK_REQUEST) elif var.__class__ == Context: d = {} for k, v in var._context.items(): @@ -890,8 +888,6 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: return TaskCallbackRequest.from_json(var) elif type_ == DAT.DAG_CALLBACK_REQUEST: return DagCallbackRequest.from_json(var) - elif type_ == DAT.SLA_CALLBACK_REQUEST: - return SlaCallbackRequest.from_json(var) elif type_ == DAT.TASK_INSTANCE_KEY: return TaskInstanceKey(**var) elif use_pydantic_models and _ENABLE_AIP_44: @@ -1289,7 +1285,7 @@ def populate_operator(cls, op: Operator, encoded_op: dict[str, Any]) -> None: continue elif k == "downstream_task_ids": v = set(v) - elif k in {"retry_delay", "execution_timeout", "sla", "max_retry_delay"}: + elif k in {"retry_delay", "execution_timeout", "max_retry_delay"}: v = cls._deserialize_timedelta(v) elif k in encoded_op["template_fields"]: pass diff --git a/airflow/settings.py b/airflow/settings.py index a242ce4da769..7a805f64a29c 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -781,9 +781,6 @@ def is_usage_data_collection_enabled() -> bool: ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False) -# Whether or not to check each dagrun against defined SLAs -CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True) - USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True) # By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False, diff --git a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst index c10b8cbae414..a52540fe7828 100644 --- a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst +++ b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst @@ -23,10 +23,6 @@ Introduction Chime notifier (:class:`airflow.providers.amazon.aws.notifications.chime.ChimeNotifier`) allows users to send messages to a Chime chat room setup via a webhook using the various ``on_*_callbacks`` at both the DAG level and Task level -You can also use a notifier with ``sla_miss_callback``. - -.. note:: - When notifiers are used with `sla_miss_callback` the context will contain only values passed to the callback, refer :ref:`sla_miss_callback`. Example Code: ------------- diff --git a/docs/apache-airflow-providers-amazon/notifications/sns.rst b/docs/apache-airflow-providers-amazon/notifications/sns.rst index 337e82cf62eb..bbaad4f81471 100644 --- a/docs/apache-airflow-providers-amazon/notifications/sns.rst +++ b/docs/apache-airflow-providers-amazon/notifications/sns.rst @@ -25,11 +25,6 @@ Introduction `Amazon SNS `__ notifier :class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier` allows users to push messages to a SNS Topic using the various ``on_*_callbacks`` at both the DAG level and Task level. -You can also use a notifier with ``sla_miss_callback``. - -.. note:: - When notifiers are used with ``sla_miss_callback`` the context will contain only values passed to the callback, - refer :ref:`sla_miss_callback`. Example Code: ------------- diff --git a/docs/apache-airflow-providers-amazon/notifications/sqs.rst b/docs/apache-airflow-providers-amazon/notifications/sqs.rst index 4a2232b006a0..6951caa9fdd6 100644 --- a/docs/apache-airflow-providers-amazon/notifications/sqs.rst +++ b/docs/apache-airflow-providers-amazon/notifications/sqs.rst @@ -25,11 +25,6 @@ Introduction `Amazon SQS `__ notifier :class:`~airflow.providers.amazon.aws.notifications.sqs.SqsNotifier` allows users to push messages to an Amazon SQS Queue using the various ``on_*_callbacks`` at both the DAG level and Task level. -You can also use a notifier with ``sla_miss_callback``. - -.. note:: - When notifiers are used with ``sla_miss_callback`` the context will contain only values passed to the callback, - refer :ref:`sla_miss_callback`. Example Code: ------------- diff --git a/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst b/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst index d93d5a2fc575..d16f9b2b9e48 100644 --- a/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst +++ b/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst @@ -23,10 +23,6 @@ Introduction The Pagerduty notifier (:class:`airflow.providers.pagerduty.notifications.pagerduty.PagerdutyNotifier`) allows users to send messages to Pagerduty using the various ``on_*_callbacks`` at both the DAG level and Task level. -You can also use a notifier with ``sla_miss_callback``. - -.. note:: - When notifiers are used with `sla_miss_callback` the context will contain only values passed to the callback, refer :ref:`sla_miss_callback`. Example Code: ------------- diff --git a/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst b/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst index d967779cee9c..a4f891f8a57b 100644 --- a/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst +++ b/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst @@ -23,10 +23,6 @@ Introduction Slack notifier (:class:`airflow.providers.slack.notifications.slack.SlackNotifier`) allows users to send messages to a slack channel using the various ``on_*_callbacks`` at both the DAG level and Task level -You can also use a notifier with ``sla_miss_callback``. - -.. note:: - When notifiers are used with `sla_miss_callback` the context will contain only values passed to the callback, refer :ref:`sla_miss_callback`. Example Code: ------------- diff --git a/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst b/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst index bb9e85c67466..66ced818a7d1 100644 --- a/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst +++ b/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst @@ -24,10 +24,6 @@ Slack Incoming Webhook notifier (:class:`airflow.providers.slack.notifications.s allows users to send messages to a slack channel through `Incoming Webhook `__ using the various ``on_*_callbacks`` at both the DAG level and Task level -You can also use a notifier with ``sla_miss_callback``. - -.. note:: - When notifiers are used with `sla_miss_callback` the context will contain only values passed to the callback, refer :ref:`sla_miss_callback`. Example Code: ------------- diff --git a/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst b/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst index c7183c5e5687..4cb1bf310e03 100644 --- a/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst +++ b/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst @@ -23,10 +23,6 @@ Introduction The SMTP notifier (:class:`airflow.providers.smtp.notifications.smtp.SmtpNotifier`) allows users to send messages to SMTP servers using the various ``on_*_callbacks`` at both the DAG level and Task level. -You can also use a notifier with ``sla_miss_callback``. - -.. note:: - When notifiers are used with `sla_miss_callback` the context will contain only values passed to the callback, refer :ref:`sla_miss_callback`. Example Code: ------------- diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst index a70a876ba347..b54071373cf0 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst @@ -46,7 +46,6 @@ Name Description =========================================== ================================================================ ``on_success_callback`` Invoked when the task :ref:`succeeds ` ``on_failure_callback`` Invoked when the task :ref:`fails ` -``sla_miss_callback`` Invoked when a task misses its defined :ref:`SLA ` ``on_retry_callback`` Invoked when the task is :ref:`up for retry ` ``on_execute_callback`` Invoked right before the task begins executing. ``on_skipped_callback`` Invoked when the task is :ref:`running ` and AirflowSkipException raised. diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index c8522bee3ba1..61985cecea9b 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -164,7 +164,6 @@ Name Descripti Metric with file_path and action tagging. ``dag_processing.processor_timeouts`` Number of file processors that have been killed due to taking too long. Metric with file_path tagging. -``dag_processing.sla_callback_count`` Number of SLA callbacks received ``dag_processing.other_callback_count`` Number of non-SLA callbacks received ``dag_processing.file_path_queue_update_count`` Number of times we've scanned the filesystem and queued all existing dags ``dag_file_processor_timeouts`` (DEPRECATED) same behavior as ``dag_processing.processor_timeouts`` @@ -176,9 +175,6 @@ Name Descripti ``scheduler.critical_section_busy`` Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process. -``sla_missed`` Number of SLA misses. Metric with dag_id and task_id tagging. -``sla_callback_notification_failure`` Number of failed SLA miss callback notification attempts. Metric with dag_id and func_name tagging. -``sla_email_notification_failure`` Number of failed SLA miss email notification attempts. Metric with dag_id tagging. ``ti.start..`` Number of started task in a given dag. Similar to _start but for task ``ti.start`` Number of started task in a given dag. Similar to _start but for task. Metric with dag_id and task_id tagging. diff --git a/docs/apache-airflow/core-concepts/tasks.rst b/docs/apache-airflow/core-concepts/tasks.rst index 0e05f55bcf5c..ad03283ef772 100644 --- a/docs/apache-airflow/core-concepts/tasks.rst +++ b/docs/apache-airflow/core-concepts/tasks.rst @@ -149,82 +149,11 @@ is periodically executed and rescheduled until it succeeds. mode="reschedule", ) -If you merely want to be notified if a task runs over but still let it run to completion, you want :ref:`concepts:slas` instead. - - -.. _concepts:slas: SLAs ---- -An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should be completed relative to the Dag Run start time. If a task takes longer than this to run, it is then visible in the "SLA Misses" part of the user interface, as well as going out in an email of all tasks that missed their SLA. - -Tasks over their SLA are not cancelled, though - they are allowed to run to completion. If you want to cancel a task after a certain runtime is reached, you want :ref:`concepts:timeouts` instead. - -To set an SLA for a task, pass a ``datetime.timedelta`` object to the Task/Operator's ``sla`` parameter. You can also supply an ``sla_miss_callback`` that will be called when the SLA is missed if you want to run your own logic. - -If you want to disable SLA checking entirely, you can set ``check_slas = False`` in Airflow's ``[core]`` configuration. - -To read more about configuring the emails, see :doc:`/howto/email-config`. - -.. note:: - - Manually-triggered tasks and tasks in event-driven DAGs will not be checked for an SLA miss. For more information on DAG ``schedule`` values see :doc:`DAG Run `. - -.. _concepts:sla_miss_callback: - -sla_miss_callback -~~~~~~~~~~~~~~~~~ - -You can also supply an ``sla_miss_callback`` that will be called when the SLA is missed if you want to run your own logic. -The function signature of an ``sla_miss_callback`` requires 5 parameters. - -#. ``dag`` - - * Parent :ref:`DAG ` Object for the :doc:`DAGRun ` in which tasks missed their - :ref:`SLA `. - -#. ``task_list`` - - * String list (new-line separated, \\n) of all tasks that missed their :ref:`SLA ` - since the last time that the ``sla_miss_callback`` ran. - -#. ``blocking_task_list`` - - * Any task in the :doc:`DAGRun(s)` (with the same ``execution_date`` as a task that missed - :ref:`SLA `) that is not in a **SUCCESS** state at the time that the ``sla_miss_callback`` - runs. i.e. 'running', 'failed'. These tasks are described as tasks that are blocking itself or another - task from completing before its SLA window is complete. - -#. ``slas`` - - * List of :py:mod:`SlaMiss` objects associated with the tasks in the - ``task_list`` parameter. - -#. ``blocking_tis`` - - * List of the :ref:`TaskInstance ` objects that are associated with the tasks - in the ``blocking_task_list`` parameter. - -Examples of ``sla_miss_callback`` function signature: - -.. code-block:: python - - def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis): - ... - -.. code-block:: python - - def my_sla_miss_callback(*args): - ... - -Example DAG: - -.. exampleinclude:: /../../airflow/example_dags/example_sla_dag.py - :language: python - :start-after: [START howto_task_sla] - :end-before: [END howto_task_sla] - +The SLA feature from Airflow 2 has been removed in 3.0 and will be replaced with a new implementation in Airflow 3.1 Special Exceptions ------------------ diff --git a/docs/conf.py b/docs/conf.py index c87871e7ede6..4d01e402195a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -755,7 +755,6 @@ def _get_params(root_schema: dict, prefix: str = "", default_section: str = "") "*/node_modules/*", "*/migrations/*", "*/contrib/*", - "**/example_sla_dag.py", "**/example_taskflow_api_docker_virtualenv.py", "**/example_dag_decorator.py", ] diff --git a/newsfragments/42285.significant.rst b/newsfragments/42285.significant.rst new file mode 100644 index 000000000000..8f8cfa0dee29 --- /dev/null +++ b/newsfragments/42285.significant.rst @@ -0,0 +1 @@ +The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in 3.1 diff --git a/tests/callbacks/test_callback_requests.py b/tests/callbacks/test_callback_requests.py index 6d900c8bd357..5992ee6fbbf7 100644 --- a/tests/callbacks/test_callback_requests.py +++ b/tests/callbacks/test_callback_requests.py @@ -23,7 +23,6 @@ from airflow.callbacks.callback_requests import ( CallbackRequest, DagCallbackRequest, - SlaCallbackRequest, TaskCallbackRequest, ) from airflow.models.dag import DAG @@ -55,14 +54,6 @@ class TestCallbackRequest: ), DagCallbackRequest, ), - ( - SlaCallbackRequest( - full_filepath="filepath", - dag_id="fake_dag", - processor_subdir="/test_dir", - ), - SlaCallbackRequest, - ), ], ) def test_from_json(self, input, request_class): diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 8112b7222a69..1d3fefdf12d5 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -39,7 +39,7 @@ import time_machine from sqlalchemy import func -from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest, SlaCallbackRequest +from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.configuration import conf from airflow.dag_processing.manager import ( @@ -1179,16 +1179,10 @@ def test_fetch_callbacks_from_database(self, tmp_path): processor_subdir=os.fspath(tmp_path), run_id="456", ) - callback3 = SlaCallbackRequest( - dag_id="test_start_date_scheduling", - full_filepath=str(dag_filepath), - processor_subdir=os.fspath(tmp_path), - ) with create_session() as session: session.add(DbCallbackRequest(callback=callback1, priority_weight=11)) session.add(DbCallbackRequest(callback=callback2, priority_weight=10)) - session.add(DbCallbackRequest(callback=callback3, priority_weight=9)) child_pipe, parent_pipe = multiprocessing.Pipe() manager = DagProcessorJobRunner( @@ -1371,16 +1365,6 @@ def test_callback_queue(self, tmp_path): processor_subdir=tmp_path, msg=None, ) - dag1_sla1 = SlaCallbackRequest( - full_filepath="/green_eggs/ham/file1.py", - dag_id="dag1", - processor_subdir=tmp_path, - ) - dag1_sla2 = SlaCallbackRequest( - full_filepath="/green_eggs/ham/file1.py", - dag_id="dag1", - processor_subdir=tmp_path, - ) dag2_req1 = DagCallbackRequest( full_filepath="/green_eggs/ham/file2.py", @@ -1391,15 +1375,8 @@ def test_callback_queue(self, tmp_path): msg=None, ) - dag3_sla1 = SlaCallbackRequest( - full_filepath="/green_eggs/ham/file3.py", - dag_id="dag3", - processor_subdir=tmp_path, - ) - # when manager.processor._add_callback_to_queue(dag1_req1) - manager.processor._add_callback_to_queue(dag1_sla1) manager.processor._add_callback_to_queue(dag2_req1) # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last) @@ -1408,18 +1385,10 @@ def test_callback_queue(self, tmp_path): dag1_req1.full_filepath, dag2_req1.full_filepath, } - assert manager.processor._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] assert manager.processor._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1] - # when - manager.processor._add_callback_to_queue(dag1_sla2) - manager.processor._add_callback_to_queue(dag3_sla1) - - # then - since sla2 == sla1, should not have brought dag1 to the fore, and an SLA on dag3 doesn't # update the queue, although the callback is registered assert manager.processor._file_path_queue == deque([dag2_req1.full_filepath, dag1_req1.full_filepath]) - assert manager.processor._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] - assert manager.processor._callback_to_execute[dag3_sla1.full_filepath] == [dag3_sla1] # when manager.processor._add_callback_to_queue(dag1_req2) @@ -1428,7 +1397,6 @@ def test_callback_queue(self, tmp_path): assert manager.processor._file_path_queue == deque([dag1_req1.full_filepath, dag2_req1.full_filepath]) assert manager.processor._callback_to_execute[dag1_req1.full_filepath] == [ dag1_req1, - dag1_sla1, dag1_req2, ] diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 2b250ae8c55e..d7b2b2116653 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -32,10 +32,9 @@ from airflow.configuration import TEST_DAGS_FOLDER, conf from airflow.dag_processing.manager import DagFileProcessorAgent from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess -from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance +from airflow.models import DagBag, DagModel, TaskInstance from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance -from airflow.operators.empty import EmptyOperator from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import State @@ -50,7 +49,6 @@ clear_db_pools, clear_db_runs, clear_db_serialized_dags, - clear_db_sla_miss, ) from tests.test_utils.mock_executor import MockExecutor @@ -89,7 +87,6 @@ def clean_db(): clear_db_runs() clear_db_pools() clear_db_dags() - clear_db_sla_miss() clear_db_import_errors() clear_db_jobs() clear_db_serialized_dags() @@ -116,395 +113,6 @@ def _process_file(self, file_path, dag_directory, session): dag_file_processor.process_file(file_path, [], False) - @pytest.mark.skip_if_database_isolation_mode - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag, create_dummy_dag, get_test_dag): - """ - Test that the dag file processor calls the sla miss callback - """ - session = settings.Session() - sla_callback = MagicMock() - - # Create dag with a start of 1 day ago, but a sla of 0, so we'll already have a sla_miss on the books. - test_start_date = timezone.utcnow() - datetime.timedelta(days=1) - test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - dag, task = create_dummy_dag( - dag_id="test_sla_miss", - task_id="dummy", - sla_miss_callback=sla_callback, - default_args={"start_date": test_start_date, "sla": datetime.timedelta()}, - ) - - session.merge( - TaskInstance( - task=task, - run_id=test_run_id, - state=State.SUCCESS, - ) - ) - session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - session.commit() - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - - assert sla_callback.called - - @pytest.mark.skip_if_database_isolation_mode - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_callback_invalid_sla(self, mock_get_dagbag, create_dummy_dag): - """ - Test that the dag file processor does not call the sla miss callback when - given an invalid sla - """ - session = settings.Session() - - sla_callback = MagicMock() - - # Create dag with a start of 1 day ago, but an sla of 0 - # so we'll already have an sla_miss on the books. - # Pass anything besides a timedelta object to the sla argument. - test_start_date = timezone.utcnow() - datetime.timedelta(days=1) - test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - dag, task = create_dummy_dag( - dag_id="test_sla_miss", - task_id="dummy", - sla_miss_callback=sla_callback, - default_args={"start_date": test_start_date, "sla": None}, - ) - - session.merge(TaskInstance(task=task, run_id=test_run_id, state=State.SUCCESS)) - session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", execution_date=test_start_date)) - - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - sla_callback.assert_not_called() - - @pytest.mark.skip_if_database_isolation_mode - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_callback_sent_notification(self, mock_get_dagbag, create_dummy_dag): - """ - Test that the dag file processor does not call the sla_miss_callback when a - notification has already been sent - """ - session = settings.Session() - - # Mock the callback function so we can verify that it was not called - sla_callback = MagicMock() - - # Create dag with a start of 2 days ago, but an sla of 1 day - # ago so we'll already have an sla_miss on the books - test_start_date = timezone.utcnow() - datetime.timedelta(days=2) - test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - dag, task = create_dummy_dag( - dag_id="test_sla_miss", - task_id="dummy", - sla_miss_callback=sla_callback, - default_args={"start_date": test_start_date, "sla": datetime.timedelta(days=1)}, - ) - - # Create a TaskInstance for two days ago - session.merge(TaskInstance(task=task, run_id=test_run_id, state=State.SUCCESS)) - - # Create an SlaMiss where notification was sent, but email was not - session.merge( - SlaMiss( - task_id="dummy", - dag_id="test_sla_miss", - execution_date=test_start_date, - email_sent=False, - notification_sent=True, - ) - ) - - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - # Now call manage_slas and see if the sla_miss callback gets called - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - - sla_callback.assert_not_called() - - @pytest.mark.skip_if_database_isolation_mode - @mock.patch("airflow.dag_processing.processor.Stats.incr") - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error( - self, mock_get_dagbag, mock_stats_incr, dag_maker - ): - """ - Test that the dag file processor does not try to insert already existing item into the database - """ - session = settings.Session() - - # Create dag with a start of 2 days ago, but an sla of 1 day - # ago so we'll already have an sla_miss on the books - test_start_date = timezone.utcnow() - datetime.timedelta(days=2) - with dag_maker( - dag_id="test_sla_miss", - default_args={"start_date": test_start_date, "sla": datetime.timedelta(days=1)}, - ) as dag: - task = EmptyOperator(task_id="dummy") - - dr = dag_maker.create_dagrun(execution_date=test_start_date, state=State.SUCCESS) - - # Create a TaskInstance for two days ago - ti = TaskInstance(task=task, run_id=dr.run_id, state=State.SUCCESS) - session.merge(ti) - session.flush() - - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - sla_miss_count = ( - session.query(SlaMiss) - .filter( - SlaMiss.dag_id == dag.dag_id, - SlaMiss.task_id == task.task_id, - ) - .count() - ) - assert sla_miss_count == 1 - mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id": "test_sla_miss", "task_id": "dummy"}) - # Now call manage_slas and see that it runs without errors - # because of existing SlaMiss above. - # Since this is run often, it's possible that it runs before another - # ti is successful thereby trying to insert a duplicate record. - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - - @pytest.mark.skip_if_database_isolation_mode - @mock.patch("airflow.dag_processing.processor.Stats.incr") - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla( - self, mock_get_dagbag, mock_stats_incr, dag_maker - ): - """ - Test that the dag file processor continue checking subsequent task instances - even if the preceding task instance misses the sla ahead - """ - session = settings.Session() - - # Create a dag with a start of 3 days ago and sla of 1 day, - # so we have 2 missing slas - now = timezone.utcnow() - test_start_date = now - datetime.timedelta(days=3) - # test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - with dag_maker( - dag_id="test_sla_miss", - default_args={"start_date": test_start_date, "sla": datetime.timedelta(days=1)}, - ) as dag: - task = EmptyOperator(task_id="dummy") - - dr = dag_maker.create_dagrun(execution_date=test_start_date, state=State.SUCCESS) - - session.merge(TaskInstance(task=task, run_id=dr.run_id, state="success")) - session.merge( - SlaMiss(task_id=task.task_id, dag_id=dag.dag_id, execution_date=now - datetime.timedelta(days=2)) - ) - session.flush() - - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - sla_miss_count = ( - session.query(SlaMiss) - .filter( - SlaMiss.dag_id == dag.dag_id, - SlaMiss.task_id == task.task_id, - ) - .count() - ) - assert sla_miss_count == 2 - mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id": "test_sla_miss", "task_id": "dummy"}) - - @pytest.mark.skip_if_database_isolation_mode - @patch.object(DagFileProcessor, "logger") - @mock.patch("airflow.dag_processing.processor.Stats.incr") - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_callback_exception( - self, - mock_get_dagbag, - mock_stats_incr, - mock_get_log, - create_dummy_dag, - ): - """ - Test that the dag file processor gracefully logs an exception if there is a problem - calling the sla_miss_callback - """ - session = settings.Session() - - sla_callback = MagicMock( - __name__="function_name", side_effect=RuntimeError("Could not call function") - ) - - test_start_date = timezone.utcnow() - datetime.timedelta(days=1) - test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - - for i, callback in enumerate([[sla_callback], sla_callback]): - dag, task = create_dummy_dag( - dag_id=f"test_sla_miss_{i}", - task_id="dummy", - sla_miss_callback=callback, - default_args={"start_date": test_start_date, "sla": datetime.timedelta(hours=1)}, - ) - mock_stats_incr.reset_mock() - - session.merge(TaskInstance(task=task, run_id=test_run_id, state=State.SUCCESS)) - - # Create an SlaMiss where notification was sent, but email was not - session.merge( - SlaMiss(task_id="dummy", dag_id=f"test_sla_miss_{i}", execution_date=test_start_date) - ) - - # Now call manage_slas and see if the sla_miss callback gets called - mock_log = mock.Mock() - mock_get_log.return_value = mock_log - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - assert sla_callback.called - mock_log.exception.assert_called_once_with( - "Could not call sla_miss_callback(%s) for DAG %s", - sla_callback.__name__, - f"test_sla_miss_{i}", - ) - mock_stats_incr.assert_called_once_with( - "sla_callback_notification_failure", - tags={"dag_id": f"test_sla_miss_{i}", "func_name": sla_callback.__name__}, - ) - - @pytest.mark.skip_if_database_isolation_mode - @mock.patch("airflow.dag_processing.processor.send_email") - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( - self, mock_get_dagbag, mock_send_email, create_dummy_dag - ): - session = settings.Session() - - test_start_date = timezone.utcnow() - datetime.timedelta(days=1) - test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - email1 = "test1@test.com" - dag, task = create_dummy_dag( - dag_id="test_sla_miss", - task_id="sla_missed", - email=email1, - default_args={"start_date": test_start_date, "sla": datetime.timedelta(hours=1)}, - ) - session.merge(TaskInstance(task=task, run_id=test_run_id, state=State.SUCCESS)) - - email2 = "test2@test.com" - EmptyOperator(task_id="sla_not_missed", dag=dag, owner="airflow", email=email2) - - session.merge(SlaMiss(task_id="sla_missed", dag_id="test_sla_miss", execution_date=test_start_date)) - - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - - assert len(mock_send_email.call_args_list) == 1 - - send_email_to = mock_send_email.call_args_list[0][0][0] - assert email1 in send_email_to - assert email2 not in send_email_to - - @pytest.mark.skip_if_database_isolation_mode - @patch.object(DagFileProcessor, "logger") - @mock.patch("airflow.dag_processing.processor.Stats.incr") - @mock.patch("airflow.utils.email.send_email") - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_email_exception( - self, - mock_get_dagbag, - mock_send_email, - mock_stats_incr, - mock_get_log, - create_dummy_dag, - ): - """ - Test that the dag file processor gracefully logs an exception if there is a problem - sending an email - """ - session = settings.Session() - dag_id = "test_sla_miss" - task_id = "test_ti" - email = "test@test.com" - - # Mock the callback function so we can verify that it was not called - mock_send_email.side_effect = RuntimeError("Could not send an email") - - test_start_date = timezone.utcnow() - datetime.timedelta(days=1) - test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - dag, task = create_dummy_dag( - dag_id=dag_id, - task_id=task_id, - email=email, - default_args={"start_date": test_start_date, "sla": datetime.timedelta(hours=1)}, - ) - mock_stats_incr.reset_mock() - - session.merge(TaskInstance(task=task, run_id=test_run_id, state=State.SUCCESS)) - - # Create an SlaMiss where notification was sent, but email was not - session.merge(SlaMiss(task_id=task_id, dag_id=dag_id, execution_date=test_start_date)) - - mock_log = mock.Mock() - mock_get_log.return_value = mock_log - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id=dag_id, session=session) - mock_log.exception.assert_called_once_with( - "Could not send SLA Miss email notification for DAG %s", dag_id - ) - mock_stats_incr.assert_called_once_with("sla_email_notification_failure", tags={"dag_id": dag_id}) - - @pytest.mark.skip_if_database_isolation_mode - @mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag") - def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, create_dummy_dag): - """ - Test that the dag file processor will not crash when trying to send - sla miss notification for a deleted task - """ - session = settings.Session() - - test_start_date = timezone.utcnow() - datetime.timedelta(days=1) - test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date) - dag, task = create_dummy_dag( - dag_id="test_sla_miss", - task_id="dummy", - email="test@test.com", - default_args={"start_date": test_start_date, "sla": datetime.timedelta(hours=1)}, - ) - - session.merge(TaskInstance(task=task, run_id=test_run_id, state=State.SUCCESS)) - - # Create an SlaMiss where notification was sent, but email was not - session.merge( - SlaMiss(task_id="dummy_deleted", dag_id="test_sla_miss", execution_date=test_start_date) - ) - - mock_dagbag = mock.Mock() - mock_dagbag.get_dag.return_value = dag - mock_get_dagbag.return_value = mock_dagbag - - DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session) - @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode @patch.object(TaskInstance, "handle_failure") def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4067f4fa1790..48b68536d573 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -36,7 +36,7 @@ import airflow.example_dags from airflow import settings -from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest +from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest from airflow.callbacks.database_callback_sink import DatabaseCallbackSink from airflow.callbacks.pipe_callback_sink import PipeCallbackSink from airflow.dag_processing.manager import DagFileProcessorAgent @@ -3987,82 +3987,6 @@ def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog): assert old_task_job.state == State.RUNNING assert "Marked 1 SchedulerJob instances as failed" in caplog.messages - def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker): - """Test SLA Callbacks are not sent when check_slas is False""" - dag_id = "test_send_sla_callbacks_to_processor_sla_disabled" - with dag_maker(dag_id=dag_id, schedule="@daily") as dag: - EmptyOperator(task_id="task1") - - with patch.object(settings, "CHECK_SLAS", False): - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - scheduler_job.executor = MockExecutor() - self.job_runner._send_sla_callbacks_to_processor(dag) - scheduler_job.executor.callback_sink.send.assert_not_called() - - def test_send_sla_callbacks_to_processor_sla_no_task_slas(self, dag_maker): - """Test SLA Callbacks are not sent when no task SLAs are defined""" - dag_id = "test_send_sla_callbacks_to_processor_sla_no_task_slas" - with dag_maker(dag_id=dag_id, schedule="@daily") as dag: - EmptyOperator(task_id="task1") - - with patch.object(settings, "CHECK_SLAS", True): - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - scheduler_job.executor = MockExecutor() - self.job_runner._send_sla_callbacks_to_processor(dag) - scheduler_job.executor.callback_sink.send.assert_not_called() - - @pytest.mark.parametrize( - "schedule", - [ - "@daily", - "0 10 * * *", - timedelta(hours=2), - ], - ) - def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, schedule, dag_maker): - """Test SLA Callbacks are sent to the DAG Processor when SLAs are defined on tasks""" - dag_id = "test_send_sla_callbacks_to_processor_sla_with_task_slas" - with dag_maker( - dag_id=dag_id, - schedule=schedule, - processor_subdir=TEST_DAG_FOLDER, - ) as dag: - EmptyOperator(task_id="task1", sla=timedelta(seconds=60)) - - with patch.object(settings, "CHECK_SLAS", True): - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - scheduler_job.executor = MockExecutor() - self.job_runner._send_sla_callbacks_to_processor(dag) - expected_callback = SlaCallbackRequest( - full_filepath=dag.fileloc, - dag_id=dag.dag_id, - processor_subdir=TEST_DAG_FOLDER, - ) - scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback) - - @pytest.mark.parametrize( - "schedule", - [ - None, - [Dataset("foo")], - ], - ) - def test_send_sla_callbacks_to_processor_sla_dag_not_scheduled(self, schedule, dag_maker): - """Test SLA Callbacks are not sent when DAG isn't scheduled""" - dag_id = "test_send_sla_callbacks_to_processor_sla_no_task_slas" - with dag_maker(dag_id=dag_id, schedule=schedule) as dag: - EmptyOperator(task_id="task1", sla=timedelta(seconds=5)) - - with patch.object(settings, "CHECK_SLAS", True): - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - scheduler_job.executor = MockExecutor() - self.job_runner._send_sla_callbacks_to_processor(dag) - scheduler_job.executor.callback_sink.send.assert_not_called() - @pytest.mark.parametrize( "schedule, number_running, excepted", [ diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py index 2aa5b76b22c0..3c5b7634d5a9 100644 --- a/tests/models/test_baseoperator.py +++ b/tests/models/test_baseoperator.py @@ -304,51 +304,6 @@ def test_render_template_with_native_envs(self, content, context, expected_outpu result = task.render_template(content, context) assert result == expected_output - def test_mapped_dag_slas_disabled_classic(self): - class MyOp(BaseOperator): - def __init__(self, x, **kwargs): - self.x = x - super().__init__(**kwargs) - - def execute(self, context): - print(self.x) - - with DAG( - dag_id="test-dag", - schedule=None, - start_date=DEFAULT_DATE, - default_args={"sla": timedelta(minutes=30)}, - ) as dag: - - @dag.task - def get_values(): - return [0, 1, 2] - - task1 = get_values() - with pytest.raises(AirflowException, match="SLAs are unsupported with mapped tasks"): - MyOp.partial(task_id="hi").expand(x=task1) - - def test_mapped_dag_slas_disabled_taskflow(self): - with DAG( - dag_id="test-dag", - schedule=None, - start_date=DEFAULT_DATE, - default_args={"sla": timedelta(minutes=30)}, - ) as dag: - - @dag.task - def get_values(): - return [0, 1, 2] - - task1 = get_values() - - @dag.task - def print_val(x): - print(x) - - with pytest.raises(AirflowException, match="SLAs are unsupported with mapped tasks"): - print_val.expand(x=task1) - @pytest.mark.db_test def test_render_template_fields(self): """Verify if operator attributes are correctly templated.""" diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 7dfe57054c60..758c7f496ed9 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -143,7 +143,6 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "retries": 1, "retry_delay": {"__type": "timedelta", "__var": 300.0}, "max_retry_delay": {"__type": "timedelta", "__var": 600.0}, - "sla": {"__type": "timedelta", "__var": 100.0}, }, }, "start_date": 1564617600.0, @@ -179,7 +178,6 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "retries": 1, "retry_delay": 300.0, "max_retry_delay": 600.0, - "sla": 100.0, "downstream_task_ids": [], "_is_empty": False, "ui_color": "#f0ede4", @@ -218,7 +216,6 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i "retries": 1, "retry_delay": 300.0, "max_retry_delay": 600.0, - "sla": 100.0, "downstream_task_ids": [], "_is_empty": False, "_operator_extra_links": [{"tests.test_utils.mock_operators.CustomOpLink": {}}], @@ -290,7 +287,6 @@ def make_simple_dag(): "retry_delay": timedelta(minutes=5), "max_retry_delay": timedelta(minutes=10), "depends_on_past": False, - "sla": timedelta(seconds=100), }, start_date=datetime(2019, 8, 1), is_paused_upon_creation=False, @@ -1299,7 +1295,6 @@ def test_no_new_fields_added_to_base_operator(self): "retry_delay": timedelta(0, 300), "retry_exponential_backoff": False, "run_as_user": None, - "sla": None, "task_id": "10", "trigger_rule": "all_success", "wait_for_downstream": False,